package org.apache.kafka.clients.consumer;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ShareAcknowledgeRequestData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ShareAcknowledgeRequest;
import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(120)
/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaShareConsumerTest.class */
public class KafkaShareConsumerTest {
    private final String groupId = "test-group";
    private final String clientId1 = "client-id-1";
    private final String topic1 = "test1";
    private final Uuid topicId1 = Uuid.randomUuid();
    private final TopicPartition t1p0 = new TopicPartition("test1", 0);
    private final TopicIdPartition ti1p0 = new TopicIdPartition(this.topicId1, this.t1p0);
    private final Map<String, Uuid> topicIds = Map.of("test1", this.topicId1);
    private final int batchSize = 10;
    private final int heartbeatIntervalMs = 1000;
    private final Time time = new MockTime();
    private final SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.EARLIEST);

    @Test
    public void testVerifyHeartbeats() throws InterruptedException {
        ConsumerMetadata consumerMetadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, this.subscription, new LogContext(), new ClusterResourceListeners());
        MockClient mockClient = new MockClient(this.time, (Metadata) consumerMetadata);
        initMetadata(mockClient, Map.of("test1", 1));
        Node findCoordinator = findCoordinator(mockClient, (Node) consumerMetadata.fetch().nodes().get(0));
        AtomicReference atomicReference = new AtomicReference();
        AtomicInteger atomicInteger = new AtomicInteger();
        mockClient.prepareResponseFrom(abstractRequest -> {
            if (!(abstractRequest instanceof ShareGroupHeartbeatRequest)) {
                return false;
            }
            ShareGroupHeartbeatRequest shareGroupHeartbeatRequest = (ShareGroupHeartbeatRequest) abstractRequest;
            atomicReference.set(Uuid.fromString(shareGroupHeartbeatRequest.data().memberId()));
            boolean z = shareGroupHeartbeatRequest.data().memberEpoch() == 0;
            atomicInteger.addAndGet(1);
            mockClient.prepareResponseFrom(abstractRequest -> {
                if (!(abstractRequest instanceof ShareGroupHeartbeatRequest)) {
                    return false;
                }
                ShareGroupHeartbeatRequest shareGroupHeartbeatRequest2 = (ShareGroupHeartbeatRequest) abstractRequest;
                boolean z2 = shareGroupHeartbeatRequest2.data().memberId().equals(((Uuid) atomicReference.get()).toString()) && shareGroupHeartbeatRequest2.data().memberEpoch() == 1;
                atomicInteger.addAndGet(1);
                return z2;
            }, (AbstractResponse) shareGroupHeartbeatResponse((Uuid) atomicReference.get(), 2, this.ti1p0), findCoordinator);
            return z;
        }, (AbstractResponse) shareGroupHeartbeatResponse((Uuid) atomicReference.get(), 1, this.ti1p0), findCoordinator);
        KafkaShareConsumer<String, String> newShareConsumer = newShareConsumer("client-id-1", consumerMetadata, mockClient);
        try {
            newShareConsumer.subscribe(Set.of("test1"));
            newShareConsumer.poll(Duration.ZERO);
            Thread.sleep(1000L);
            Assertions.assertEquals(2, atomicInteger.get());
            Assertions.assertTrue(mockClient.futureResponses().isEmpty());
            newShareConsumer.close(Duration.ZERO);
            if (newShareConsumer != null) {
                newShareConsumer.close();
            }
        } catch (Throwable th) {
            if (newShareConsumer != null) {
                try {
                    newShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testVerifyFetchAndCommitSyncImplicit() throws InterruptedException {
        ConsumerMetadata consumerMetadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, this.subscription, new LogContext(), new ClusterResourceListeners());
        MockClient mockClient = new MockClient(this.time, (Metadata) consumerMetadata);
        initMetadata(mockClient, Map.of("test1", 1));
        Node node = (Node) consumerMetadata.fetch().nodes().get(0);
        AtomicBoolean shareGroupHeartbeatGenerator = shareGroupHeartbeatGenerator(mockClient, findCoordinator(mockClient, node), new AtomicReference<>(), this.ti1p0);
        mockClient.prepareResponseFrom(abstractRequest -> {
            if (!(abstractRequest instanceof ShareFetchRequest)) {
                return false;
            }
            ShareFetchRequest shareFetchRequest = (ShareFetchRequest) abstractRequest;
            return shareFetchRequest.data().groupId().equals("test-group") && shareFetchRequest.data().shareSessionEpoch() == 0 && shareFetchRequest.data().batchSize() == 10 && ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).topicId().equals(this.topicId1) && ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).partitions().size() == 1 && ((ShareFetchRequestData.FetchPartition) ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().isEmpty();
        }, (AbstractResponse) shareFetchResponse(this.ti1p0, 2), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            if (!(abstractRequest2 instanceof ShareAcknowledgeRequest)) {
                return false;
            }
            ShareAcknowledgeRequest shareAcknowledgeRequest = (ShareAcknowledgeRequest) abstractRequest2;
            return shareAcknowledgeRequest.data().groupId().equals("test-group") && shareAcknowledgeRequest.data().shareSessionEpoch() == 1 && ((ShareAcknowledgeRequestData.AcknowledgementBatch) ((ShareAcknowledgeRequestData.AcknowledgePartition) ((ShareAcknowledgeRequestData.AcknowledgeTopic) shareAcknowledgeRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().get(0)).firstOffset() == 0 && ((ShareAcknowledgeRequestData.AcknowledgementBatch) ((ShareAcknowledgeRequestData.AcknowledgePartition) ((ShareAcknowledgeRequestData.AcknowledgeTopic) shareAcknowledgeRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().get(0)).lastOffset() == 1 && ((ShareAcknowledgeRequestData.AcknowledgementBatch) ((ShareAcknowledgeRequestData.AcknowledgePartition) ((ShareAcknowledgeRequestData.AcknowledgeTopic) shareAcknowledgeRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().get(0)).acknowledgeTypes().size() == 1 && ((Byte) ((ShareAcknowledgeRequestData.AcknowledgementBatch) ((ShareAcknowledgeRequestData.AcknowledgePartition) ((ShareAcknowledgeRequestData.AcknowledgeTopic) shareAcknowledgeRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().get(0)).acknowledgeTypes().get(0)).byteValue() == 1;
        }, (AbstractResponse) shareAcknowledgeResponse(this.ti1p0), node);
        mockClient.prepareResponseFrom(abstractRequest3 -> {
            if (!(abstractRequest3 instanceof ShareAcknowledgeRequest)) {
                return false;
            }
            ShareAcknowledgeRequest shareAcknowledgeRequest = (ShareAcknowledgeRequest) abstractRequest3;
            return shareAcknowledgeRequest.data().groupId().equals("test-group") && shareAcknowledgeRequest.data().shareSessionEpoch() == -1 && shareAcknowledgeRequest.data().topics().isEmpty();
        }, (AbstractResponse) shareAcknowledgeResponse(), node);
        KafkaShareConsumer<String, String> newShareConsumer = newShareConsumer("client-id-1", consumerMetadata, mockClient);
        try {
            newShareConsumer.subscribe(Set.of("test1"));
            newShareConsumer.poll(Duration.ofMillis(5000L));
            newShareConsumer.commitSync();
            newShareConsumer.close(Duration.ZERO);
            Assertions.assertTrue(shareGroupHeartbeatGenerator.get());
            Assertions.assertTrue(mockClient.futureResponses().isEmpty());
            if (newShareConsumer != null) {
                newShareConsumer.close();
            }
        } catch (Throwable th) {
            if (newShareConsumer != null) {
                try {
                    newShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testVerifyFetchAndCloseImplicit() throws InterruptedException {
        ConsumerMetadata consumerMetadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, this.subscription, new LogContext(), new ClusterResourceListeners());
        MockClient mockClient = new MockClient(this.time, (Metadata) consumerMetadata);
        initMetadata(mockClient, Map.of("test1", 1));
        Node node = (Node) consumerMetadata.fetch().nodes().get(0);
        AtomicBoolean shareGroupHeartbeatGenerator = shareGroupHeartbeatGenerator(mockClient, findCoordinator(mockClient, node), new AtomicReference<>(), this.ti1p0);
        mockClient.prepareResponseFrom(abstractRequest -> {
            if (!(abstractRequest instanceof ShareFetchRequest)) {
                return false;
            }
            ShareFetchRequest shareFetchRequest = (ShareFetchRequest) abstractRequest;
            return shareFetchRequest.data().groupId().equals("test-group") && shareFetchRequest.data().shareSessionEpoch() == 0 && shareFetchRequest.data().batchSize() == 10 && ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).topicId().equals(this.topicId1) && ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).partitions().size() == 1 && ((ShareFetchRequestData.FetchPartition) ((ShareFetchRequestData.FetchTopic) shareFetchRequest.data().topics().get(0)).partitions().get(0)).acknowledgementBatches().isEmpty();
        }, (AbstractResponse) shareFetchResponse(this.ti1p0, 2), node);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            if (!(abstractRequest2 instanceof ShareAcknowledgeRequest)) {
                return false;
            }
            ShareAcknowledgeRequest shareAcknowledgeRequest = (ShareAcknowledgeRequest) abstractRequest2;
            return shareAcknowledgeRequest.data().groupId().equals("test-group") && shareAcknowledgeRequest.data().shareSessionEpoch() == -1 && shareAcknowledgeRequest.data().topics().isEmpty();
        }, (AbstractResponse) shareAcknowledgeResponse(), node);
        KafkaShareConsumer<String, String> newShareConsumer = newShareConsumer("client-id-1", consumerMetadata, mockClient);
        try {
            newShareConsumer.subscribe(Set.of("test1"));
            newShareConsumer.poll(Duration.ofMillis(5000L));
            newShareConsumer.close(Duration.ZERO);
            Assertions.assertTrue(shareGroupHeartbeatGenerator.get());
            Assertions.assertTrue(mockClient.futureResponses().isEmpty());
            if (newShareConsumer != null) {
                newShareConsumer.close();
            }
        } catch (Throwable th) {
            if (newShareConsumer != null) {
                try {
                    newShareConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private KafkaShareConsumer<String, String> newShareConsumer(String str, ConsumerMetadata consumerMetadata, KafkaClient kafkaClient) {
        return new KafkaShareConsumer<>(new LogContext(), str, "test-group", newConsumerConfig(str), new StringDeserializer(), new StringDeserializer(), this.time, kafkaClient, this.subscription, consumerMetadata);
    }

    private ConsumerConfig newConsumerConfig(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", str);
        hashMap.put("group.id", "test-group");
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", StringDeserializer.class);
        hashMap.put("max.poll.records", 10);
        return new ConsumerConfig(hashMap);
    }

    private void initMetadata(MockClient mockClient, Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            hashMap.put(str, this.topicIds.get(str));
        }
        mockClient.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, map, hashMap));
    }

    private Node findCoordinator(MockClient mockClient, Node node) {
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-group", node), node);
        return new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
    }

    private AtomicBoolean shareGroupHeartbeatGenerator(MockClient mockClient, Node node, AtomicReference<Uuid> atomicReference, TopicIdPartition topicIdPartition) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        shareGroupHeartbeat(mockClient, node, atomicReference, 0, topicIdPartition, new AtomicInteger(), atomicBoolean);
        return atomicBoolean;
    }

    private void shareGroupHeartbeat(MockClient mockClient, Node node, AtomicReference<Uuid> atomicReference, int i, TopicIdPartition topicIdPartition, AtomicInteger atomicInteger, AtomicBoolean atomicBoolean) {
        mockClient.prepareResponseFrom(abstractRequest -> {
            if (!(abstractRequest instanceof ShareGroupHeartbeatRequest)) {
                return false;
            }
            ShareGroupHeartbeatRequest shareGroupHeartbeatRequest = (ShareGroupHeartbeatRequest) abstractRequest;
            if (shareGroupHeartbeatRequest.data().memberEpoch() == 0) {
                atomicReference.set(Uuid.fromString(shareGroupHeartbeatRequest.data().memberId()));
            }
            if (shareGroupHeartbeatRequest.data().memberEpoch() == -1) {
                atomicBoolean.set(true);
            } else {
                shareGroupHeartbeat(mockClient, node, atomicReference, i + 1, topicIdPartition, atomicInteger, atomicBoolean);
            }
            atomicInteger.addAndGet(1);
            return true;
        }, (AbstractResponse) shareGroupHeartbeatResponse(atomicReference.get(), i, topicIdPartition), node);
    }

    private ShareGroupHeartbeatResponse shareGroupHeartbeatResponse(Uuid uuid, int i, TopicIdPartition topicIdPartition) {
        if (i == -1) {
            return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData().setMemberId(uuid != null ? uuid.toString() : null).setMemberEpoch(i).setHeartbeatIntervalMs(1000));
        }
        LinkedList linkedList = new LinkedList();
        linkedList.add(new ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topicIdPartition.topicId()).setPartitions(List.of(Integer.valueOf(topicIdPartition.partition()))));
        return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData().setMemberId(uuid != null ? uuid.toString() : null).setMemberEpoch(i).setHeartbeatIntervalMs(1000).setAssignment(new ShareGroupHeartbeatResponseData.Assignment().setTopicPartitions(linkedList)));
    }

    private ShareFetchResponse shareFetchResponse(TopicIdPartition topicIdPartition, int i) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(KafkaChannelTest.MAX_RECEIVE_SIZE), Compression.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                builder.append(0L, ("key-" + i2).getBytes(), ("value-" + i2).getBytes());
            } catch (Throwable th) {
                if (builder != null) {
                    try {
                        builder.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        MemoryRecords build = builder.build();
        if (builder != null) {
            builder.close();
        }
        return new ShareFetchResponse(new ShareFetchResponseData().setResponses(List.of(new ShareFetchResponseData.ShareFetchableTopicResponse().setTopicId(topicIdPartition.topicId()).setPartitions(List.of(new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setRecords(build).setAcquiredRecords(List.of(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(i - 1).setDeliveryCount((short) 1))))))));
    }

    private ShareAcknowledgeResponse shareAcknowledgeResponse() {
        return new ShareAcknowledgeResponse(new ShareAcknowledgeResponseData());
    }

    private ShareAcknowledgeResponse shareAcknowledgeResponse(TopicIdPartition topicIdPartition) {
        return new ShareAcknowledgeResponse(new ShareAcknowledgeResponseData().setResponses(List.of(new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponse().setTopicId(topicIdPartition.topicId()).setPartitions(List.of(new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(Errors.NONE.code()))))));
    }
}
