package io.confluent.kafka.link.integration;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.link.ClusterLinkAsyncTaskIntegrationTest;
import kafka.link.ClusterLinkTestHarness;
import kafka.server.link.ClusterLinkConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.mutable.HashMap;

@Tags({@Tag("integration"), @Tag("bazel:shard_count:8")})
/* loaded from: input_file:io/confluent/kafka/link/integration/ClusterLinkPrefixAsyncTaskIntegrationTest.class */
public class ClusterLinkPrefixAsyncTaskIntegrationTest extends ClusterLinkAsyncTaskIntegrationTest {
    Long offsetToCommit = 10L;
    Long syncPeriod = 100L;
    String consumerGroup = "testGroup";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/link/integration/ClusterLinkPrefixAsyncTaskIntegrationTest$BidirectionalStaticTestState.class */
    public static class BidirectionalStaticTestState {
        ClusterLinkTestHarness eastCluster;
        ClusterLinkTestHarness westCluster;
        String eastTopic;
        String westTopic;
        String eastNonMirrorTopic;
        String westNonMirrorTopic;
        String eastGroup;
        String westGroup;
        String eastPrefix;
        String westPrefix;
        String eastRecordKeyPrefix;
        String westRecordKeyPrefix;
        KafkaProducer<byte[], byte[]> eastProducer;
        KafkaProducer<byte[], byte[]> westProducer;
        int numRecords;

        public BidirectionalStaticTestState(ClusterLinkTestHarness clusterLinkTestHarness, ClusterLinkTestHarness clusterLinkTestHarness2, String str, String str2, String str3, String str4, String str5, String str6, String str7, String str8, String str9, String str10, KafkaProducer<byte[], byte[]> kafkaProducer, KafkaProducer<byte[], byte[]> kafkaProducer2, int i) {
            this.eastCluster = clusterLinkTestHarness;
            this.westCluster = clusterLinkTestHarness2;
            this.eastTopic = str;
            this.westTopic = str2;
            this.eastNonMirrorTopic = str3;
            this.westNonMirrorTopic = str4;
            this.eastGroup = str5;
            this.westGroup = str6;
            this.eastPrefix = str7;
            this.westPrefix = str8;
            this.eastRecordKeyPrefix = str9;
            this.westRecordKeyPrefix = str10;
            this.eastProducer = kafkaProducer;
            this.westProducer = kafkaProducer2;
            this.numRecords = i;
        }
    }

    public ClusterLinkPrefixAsyncTaskIntegrationTest() {
        clusterLinkPrefix_$eq("src_");
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateGoesIntoErrorWhenTheresAnExistingConsumerGroup(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTaskDescriptionsForTwoLinksOnDifferentLinkCoordinators(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testIntervalChangeForPeriodicTasks(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testOffsetMigrationTaskStateManagementVariousScenarios(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testSyncTopicsConfigsTaskState(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testTaskStateForVariousClusterLinkingTasks(String str, boolean z) {
    }

    @Disabled
    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testAddPartitionsWithSourceLeaderEpochBump(String str, boolean z) {
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkPrefixAddedToConsumerOffsets(String str, boolean z) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName(), sourceCluster().adminClientConfig());
        Properties destLinkProps = destLinkProps(ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        destLinkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "link1_");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig.LinkModeProp(), "DESTINATION");
        Assertions.assertFalse("link1_".isEmpty());
        createClusterLink(linkName(), destLinkProps, sourceLinkProps(ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap())), false, true);
        destCluster().linkTopic(topic(), (short) 2, linkName(), ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), "link1_");
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listTopics()).stream().findFirst().get()).startsWith("link1_"));
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith("link1_"));
        destCluster().unlinkTopic("link1_" + topic(), linkName(), true, true, true, numPartitions());
        destCluster().deleteClusterLink(linkName(), false, destCluster().brokers().toSeq());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testClusterLinkPrefixNotAddedToConsumerOffsets(String str, boolean z) {
        sourceCluster().createTopic(topic(), numPartitions(), replicationFactor(), new Properties(), sourceCluster().listenerName(), sourceCluster().adminClientConfig());
        Properties destLinkProps = destLinkProps(ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(this.consumerGroup));
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        destLinkProps.setProperty(ClusterLinkConfig.ClusterLinkPrefixProp(), "link1_");
        destLinkProps.setProperty(ClusterLinkConfig.ConsumerGroupPrefixEnableProp(), "false");
        Assertions.assertFalse("link1_".isEmpty());
        createClusterLink(linkName(), destLinkProps, sourceLinkProps(ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap())), false, true);
        destCluster().linkTopic(topic(), (short) 2, linkName(), ClusterLinkTestUtils.convertMapToScalaMap(Collections.emptyMap()), "link1_");
        commitOffsets(sourceCluster(), topic(), 0, this.offsetToCommit.longValue(), this.consumerGroup);
        Assertions.assertTrue(((String) JavaConverters.asJavaCollection(destCluster().listTopics()).stream().findFirst().get()).startsWith("link1_"));
        Assertions.assertFalse(((String) JavaConverters.asJavaCollection(destCluster().listConsumerGroups()).stream().findFirst().get()).startsWith("link1_"));
        destCluster().unlinkTopic("link1_" + topic(), linkName(), true, true, true, numPartitions());
        destCluster().deleteClusterLink(linkName(), false, destCluster().brokers().toSeq());
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalConsumerOffsetSyncingWithSameTopicOnBothSidesOfLinkAndConsumerGroupPrefixDisabled(String str, boolean z) throws ExecutionException, InterruptedException {
        testBidirectionalConsumerOffsetSyncing("clicks", "east.group", "west.", "clicks", "west.group", "east.");
    }

    @MethodSource({"allCombinations"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.coordinator={1}")
    public void testBidirectionalConsumerOffsetSyncingWithDifferentTopicsOnEachSideOfLinkAndConsumerGroupPrefixDisabled(String str, boolean z) throws ExecutionException, InterruptedException {
        testBidirectionalConsumerOffsetSyncing("east.clicks", "east.group", "west.", "west.clicks", "west.group", "east.");
    }

    private void testBidirectionalConsumerOffsetSyncing(String str, String str2, String str3, String str4, String str5, String str6) throws ExecutionException, InterruptedException {
        ClusterLinkTestHarness destCluster = destCluster();
        KafkaProducer<byte[], byte[]> createProducer = destCluster.createProducer(new ByteArraySerializer(), new ByteArraySerializer(), new Properties());
        ClusterLinkTestHarness sourceCluster = sourceCluster();
        KafkaProducer<byte[], byte[]> createProducer2 = sourceCluster.createProducer(new ByteArraySerializer(), new ByteArraySerializer(), new Properties());
        BidirectionalStaticTestState bidirectionalStaticTestState = new BidirectionalStaticTestState(destCluster, sourceCluster, str, str4, "east-non-mirror", "west-non-mirror", str2, str5, str3, str6, "east-key ", "west-key ", createProducer, createProducer2, 20);
        initializeTopics(bidirectionalStaticTestState, createProducer, createProducer2, 20);
        Properties linkOffsetSyncProps = linkOffsetSyncProps(str2, str6, destCluster);
        createClusterLink(linkName(), linkOffsetSyncProps(str5, str3, sourceCluster), Option.apply(linkOffsetSyncProps), false, true);
        linkTopics(bidirectionalStaticTestState);
        waitForMirrors(bidirectionalStaticTestState, expectedOffsets(sourceCluster, str4, str3), expectedOffsets(destCluster, str, str6));
        produceToTopics(bidirectionalStaticTestState);
        waitForMirrors(bidirectionalStaticTestState, expectedOffsets(sourceCluster, str4, str3), expectedOffsets(destCluster, str, str6));
        verifyMirrorAndNonMirrorOffsetMigration(commitOffsets(bidirectionalStaticTestState).longValue(), bidirectionalStaticTestState);
        unlinkMirrors(bidirectionalStaticTestState);
        verifyConsumption(bidirectionalStaticTestState);
        deleteLinks(bidirectionalStaticTestState);
    }

    private void waitForMirrors(BidirectionalStaticTestState bidirectionalStaticTestState, Map<TopicPartition, Object> map, Map<TopicPartition, Object> map2) {
        List<TopicPartition> list = topicPartitionList(bidirectionalStaticTestState.westTopic, bidirectionalStaticTestState.eastPrefix);
        List<TopicPartition> list2 = topicPartitionList(bidirectionalStaticTestState.eastTopic, bidirectionalStaticTestState.westPrefix);
        waitForMirrorPartitions(ClusterLinkTestUtils.convertListToScalaSeq(list), ClusterLinkTestUtils.convertMapToScalaMap(map), bidirectionalStaticTestState.eastCluster.brokers(), 15000L, bidirectionalStaticTestState.eastPrefix);
        waitForMirrorPartitions(ClusterLinkTestUtils.convertListToScalaSeq(list2), ClusterLinkTestUtils.convertMapToScalaMap(map2), bidirectionalStaticTestState.westCluster.brokers(), 15000L, bidirectionalStaticTestState.westPrefix);
    }

    private void linkTopics(BidirectionalStaticTestState bidirectionalStaticTestState) throws ExecutionException, InterruptedException {
        bidirectionalStaticTestState.eastCluster.linkTopic(bidirectionalStaticTestState.westTopic, replicationFactor(), linkName(), new HashMap(), bidirectionalStaticTestState.eastPrefix).all().get();
        bidirectionalStaticTestState.westCluster.linkTopic(bidirectionalStaticTestState.eastTopic, replicationFactor(), linkName(), new HashMap(), bidirectionalStaticTestState.westPrefix).all().get();
    }

    private void produceToTopics(BidirectionalStaticTestState bidirectionalStaticTestState) {
        produceRecords(bidirectionalStaticTestState.westProducer, bidirectionalStaticTestState.westTopic, bidirectionalStaticTestState.numRecords, obj -> {
            return bidirectionalStaticTestState.westRecordKeyPrefix + String.valueOf(obj);
        }, Option.empty(), Option.empty(), Option.empty());
        produceRecords(bidirectionalStaticTestState.eastProducer, bidirectionalStaticTestState.eastTopic, bidirectionalStaticTestState.numRecords, obj2 -> {
            return bidirectionalStaticTestState.eastRecordKeyPrefix + String.valueOf(obj2);
        }, Option.empty(), Option.empty(), Option.empty());
    }

    private Long commitOffsets(BidirectionalStaticTestState bidirectionalStaticTestState) {
        Long valueOf = Long.valueOf(bidirectionalStaticTestState.eastCluster.leaderLog(new TopicPartition(bidirectionalStaticTestState.eastTopic, 0)).logEndOffset());
        commitOffsets(bidirectionalStaticTestState.eastCluster, bidirectionalStaticTestState.eastTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.eastGroup);
        commitOffsets(bidirectionalStaticTestState.eastCluster, bidirectionalStaticTestState.eastPrefix + bidirectionalStaticTestState.westTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.eastGroup);
        commitOffsets(bidirectionalStaticTestState.eastCluster, bidirectionalStaticTestState.eastNonMirrorTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.eastGroup);
        commitOffsets(bidirectionalStaticTestState.westCluster, bidirectionalStaticTestState.westTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.westGroup);
        commitOffsets(bidirectionalStaticTestState.westCluster, bidirectionalStaticTestState.westPrefix + bidirectionalStaticTestState.eastTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.westGroup);
        commitOffsets(bidirectionalStaticTestState.westCluster, bidirectionalStaticTestState.westNonMirrorTopic, 0, valueOf.longValue(), bidirectionalStaticTestState.westGroup);
        return valueOf;
    }

    private void verifyConsumption(BidirectionalStaticTestState bidirectionalStaticTestState) {
        List<TopicPartition> list = topicPartitionList(bidirectionalStaticTestState.westTopic, "");
        List<TopicPartition> list2 = topicPartitionList(bidirectionalStaticTestState.eastTopic, "");
        List<TopicPartition> list3 = topicPartitionList(bidirectionalStaticTestState.westTopic, bidirectionalStaticTestState.eastPrefix);
        List<TopicPartition> list4 = topicPartitionList(bidirectionalStaticTestState.eastTopic, bidirectionalStaticTestState.westPrefix);
        verifyConsumption(bidirectionalStaticTestState.eastCluster, list3, list, bidirectionalStaticTestState.eastPrefix, "west-");
        verifyConsumption(bidirectionalStaticTestState.westCluster, list4, list2, bidirectionalStaticTestState.westPrefix, "east-");
    }

    private void unlinkMirrors(BidirectionalStaticTestState bidirectionalStaticTestState) {
        bidirectionalStaticTestState.eastCluster.unlinkTopic(bidirectionalStaticTestState.eastPrefix + bidirectionalStaticTestState.westTopic, linkName(), true, true, true, numPartitions());
        bidirectionalStaticTestState.westCluster.unlinkTopic(bidirectionalStaticTestState.westPrefix + bidirectionalStaticTestState.eastTopic, linkName(), true, true, true, numPartitions());
    }

    private void deleteLinks(BidirectionalStaticTestState bidirectionalStaticTestState) {
        bidirectionalStaticTestState.eastCluster.deleteClusterLink(linkName(), false, bidirectionalStaticTestState.eastCluster.aliveBrokers().toSeq());
        bidirectionalStaticTestState.westCluster.deleteClusterLink(linkName(), false, bidirectionalStaticTestState.westCluster.aliveBrokers().toSeq());
    }

    private List<TopicPartition> topicPartitionList(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numPartitions(); i++) {
            arrayList.add(new TopicPartition(str2 + str, i));
        }
        return arrayList;
    }

    private void initializeTopics(BidirectionalStaticTestState bidirectionalStaticTestState, KafkaProducer<byte[], byte[]> kafkaProducer, KafkaProducer<byte[], byte[]> kafkaProducer2, int i) {
        bidirectionalStaticTestState.westCluster.createTopic(bidirectionalStaticTestState.westTopic, numPartitions(), replicationFactor(), new Properties(), bidirectionalStaticTestState.westCluster.listenerName(), bidirectionalStaticTestState.westCluster.adminClientConfig());
        bidirectionalStaticTestState.eastCluster.createTopic(bidirectionalStaticTestState.eastTopic, numPartitions(), replicationFactor(), new Properties(), bidirectionalStaticTestState.eastCluster.listenerName(), bidirectionalStaticTestState.eastCluster.adminClientConfig());
        produceRecords(kafkaProducer2, bidirectionalStaticTestState.westTopic, i, obj -> {
            return bidirectionalStaticTestState.westRecordKeyPrefix + String.valueOf(obj);
        }, Option.empty(), Option.empty(), Option.empty());
        produceRecords(kafkaProducer, bidirectionalStaticTestState.eastTopic, i, obj2 -> {
            return bidirectionalStaticTestState.eastRecordKeyPrefix + String.valueOf(obj2);
        }, Option.empty(), Option.empty(), Option.empty());
        bidirectionalStaticTestState.westCluster.createTopic(bidirectionalStaticTestState.westNonMirrorTopic, numPartitions(), replicationFactor(), new Properties(), bidirectionalStaticTestState.westCluster.listenerName(), bidirectionalStaticTestState.westCluster.adminClientConfig());
        bidirectionalStaticTestState.eastCluster.createTopic(bidirectionalStaticTestState.eastNonMirrorTopic, numPartitions(), replicationFactor(), new Properties(), bidirectionalStaticTestState.eastCluster.listenerName(), bidirectionalStaticTestState.eastCluster.adminClientConfig());
        produceRecords(kafkaProducer2, bidirectionalStaticTestState.westNonMirrorTopic, i, obj3 -> {
            return bidirectionalStaticTestState.westRecordKeyPrefix + String.valueOf(obj3);
        }, Option.empty(), Option.empty(), Option.empty());
        produceRecords(kafkaProducer, bidirectionalStaticTestState.eastNonMirrorTopic, i, obj4 -> {
            return bidirectionalStaticTestState.eastRecordKeyPrefix + String.valueOf(obj4);
        }, Option.empty(), Option.empty(), Option.empty());
    }

    private Map<TopicPartition, Object> expectedOffsets(ClusterLinkTestHarness clusterLinkTestHarness, String str, String str2) {
        java.util.HashMap hashMap = new java.util.HashMap();
        for (int i = 0; i < numPartitions(); i++) {
            hashMap.put(new TopicPartition(str2 + str, i), Long.valueOf(clusterLinkTestHarness.leaderLog(new TopicPartition(str, i)).logEndOffset()));
        }
        return hashMap;
    }

    private Properties linkOffsetSyncProps(String str, String str2, ClusterLinkTestHarness clusterLinkTestHarness) {
        java.util.HashMap hashMap = new java.util.HashMap();
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncEnableProp(), "true");
        hashMap.put(ClusterLinkConfig.ConsumerOffsetGroupFiltersProp(), consumerGroupFilter(str));
        hashMap.put(ClusterLinkConfig.ConsumerOffsetSyncMsProp(), String.valueOf(this.syncPeriod));
        hashMap.put(ClusterLinkConfig.ClusterLinkPrefixProp(), str2);
        hashMap.put("sasl.jaas.config", createLinkCredentials(linkName(), clusterLinkTestHarness, Option.empty()));
        hashMap.put("bootstrap.servers", clusterLinkTestHarness.bootstrapServers(clusterLinkTestHarness.listenerName()));
        Properties bidirectionalProps = bidirectionalProps(hashMap);
        bidirectionalProps.putAll(clusterLinkTestHarness.clientSecurityProps(linkName()));
        return bidirectionalProps;
    }

    private void verifyMirrorAndNonMirrorOffsetMigration(long j, BidirectionalStaticTestState bidirectionalStaticTestState) {
        String str = bidirectionalStaticTestState.westGroup;
        verifyOffsetMigration(bidirectionalStaticTestState.eastPrefix + bidirectionalStaticTestState.westTopic, 0, j, str, bidirectionalStaticTestState.eastCluster);
        verifyOffsetMigration(bidirectionalStaticTestState.eastTopic, 0, j, str, bidirectionalStaticTestState.eastCluster);
        verifyOffsetMigration(bidirectionalStaticTestState.westNonMirrorTopic, 0, j, bidirectionalStaticTestState.westGroup, bidirectionalStaticTestState.westCluster);
        Assertions.assertNotEquals(j, bidirectionalStaticTestState.eastCluster.getOffset(bidirectionalStaticTestState.eastPrefix + bidirectionalStaticTestState.westNonMirrorTopic, 0, str));
        String str2 = bidirectionalStaticTestState.eastGroup;
        verifyOffsetMigration(bidirectionalStaticTestState.westPrefix + bidirectionalStaticTestState.eastTopic, 0, j, str2, bidirectionalStaticTestState.westCluster);
        verifyOffsetMigration(bidirectionalStaticTestState.westTopic, 0, j, str2, bidirectionalStaticTestState.westCluster);
        verifyOffsetMigration(bidirectionalStaticTestState.eastNonMirrorTopic, 0, j, bidirectionalStaticTestState.eastGroup, bidirectionalStaticTestState.eastCluster);
        Assertions.assertNotEquals(j, bidirectionalStaticTestState.westCluster.getOffset(bidirectionalStaticTestState.westPrefix + bidirectionalStaticTestState.eastNonMirrorTopic, 0, str2));
    }

    private Properties bidirectionalProps(Map<String, String> map) {
        Properties properties = new Properties();
        properties.setProperty(ClusterLinkConfig.LinkModeProp(), ClusterLinkConfig.LinkMode.BIDIRECTIONAL.name());
        properties.setProperty(kafka.server.link.ClusterLinkConfig.ConnectionModeProp(), "OUTBOUND");
        properties.put("metadata.max.age.ms", "2000");
        properties.put("reconnect.backoff.max.ms", "1000");
        properties.putAll(map);
        return properties;
    }

    private void verifyConsumption(ClusterLinkTestHarness clusterLinkTestHarness, List<TopicPartition> list, List<TopicPartition> list2, String str, String str2) {
        Consumer createConsumer = clusterLinkTestHarness.createConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), new Properties(), ClusterLinkTestUtils.convertListToScalaSeq(new ArrayList()).toList());
        createConsumer.assign(list);
        consumePartitionRecords(createConsumer, ClusterLinkTestUtils.convertSetToScalaSet(new HashSet(list2)), str, list2.get(0).topic(), str2);
        createConsumer.close();
    }
}
