package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.class */
public class RecordAccumulatorTest {
    private final String topic = "test";
    private final int partition1 = 0;
    private final int partition2 = 1;
    private final int partition3 = 2;
    private final Node node1 = new Node(0, "localhost", 1111);
    private final Node node2 = new Node(1, "localhost", 1112);
    private final TopicPartition tp1 = new TopicPartition("test", 0);
    private final TopicPartition tp2 = new TopicPartition("test", 1);
    private final TopicPartition tp3 = new TopicPartition("test", 2);
    private final MetadataResponse.PartitionMetadata partMetadata1 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.empty(), (List) null, (List) null, (List) null);
    private final MetadataResponse.PartitionMetadata partMetadata2 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp2, Optional.of(Integer.valueOf(this.node1.id())), Optional.empty(), (List) null, (List) null, (List) null);
    private final MetadataResponse.PartitionMetadata partMetadata3 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp3, Optional.of(Integer.valueOf(this.node2.id())), Optional.empty(), (List) null, (List) null, (List) null);
    private final List<MetadataResponse.PartitionMetadata> partMetadatas = new ArrayList(Arrays.asList(this.partMetadata1, this.partMetadata2, this.partMetadata3));
    private final Map<Integer, Node> nodes = (Map) Stream.of((Object[]) new Node[]{this.node1, this.node2}).collect(Collectors.toMap((v0) -> {
        return v0.id();
    }, Function.identity()));
    private MetadataSnapshot metadataCache = new MetadataSnapshot((String) null, this.nodes, this.partMetadatas, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
    private final Cluster cluster = this.metadataCache.cluster();
    private final MockTime time = new MockTime();
    private final byte[] key = "key".getBytes();
    private final byte[] value = "value".getBytes();
    private final int msgSize = DefaultRecord.sizeInBytes(0, 0, this.key.length, this.value.length, Record.EMPTY_HEADERS);
    private final Metrics metrics = new Metrics(this.time);
    private final long maxBlockTimeMs = 1000;
    private final LogContext logContext = new LogContext();
    private AtomicInteger mockRandom = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/RecordAccumulatorTest$BatchDrainedResult.class */
    public static class BatchDrainedResult {
        final int numSplit;
        final int numBatches;

        BatchDrainedResult(int i, int i2) {
            this.numBatches = i2;
            this.numSplit = i;
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/RecordAccumulatorTest$SequentialPartitioner.class */
    private class SequentialPartitioner extends BuiltInPartitioner {
        public SequentialPartitioner(LogContext logContext, String str, int i) {
            super(logContext, str, i);
        }

        int randomPartition() {
            return RecordAccumulatorTest.this.mockRandom == null ? super.randomPartition() : RecordAccumulatorTest.this.mockRandom.getAndIncrement();
        }
    }

    @BeforeEach
    void setup() {
    }

    @AfterEach
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testDrainBatches() throws Exception {
        TopicPartition topicPartition = new TopicPartition("test", 3);
        MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, topicPartition, Optional.of(Integer.valueOf(this.node2.id())), Optional.empty(), (List) null, (List) null, (List) null);
        this.partMetadatas.add(partitionMetadata);
        Cluster cluster = new Cluster((String) null, Arrays.asList(this.node1, this.node2), Arrays.asList(MetadataResponse.toPartitionInfo(this.partMetadata1, this.nodes), MetadataResponse.toPartitionInfo(this.partMetadata2, this.nodes), MetadataResponse.toPartitionInfo(this.partMetadata3, this.nodes), MetadataResponse.toPartitionInfo(partitionMetadata, this.nodes)), Collections.emptySet(), Collections.emptySet());
        this.metadataCache = new MetadataSnapshot((String) null, this.nodes, this.partMetadatas, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap(), cluster);
        long length = this.value.length + 61;
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator((int) length, 2147483647L, Compression.NONE, 10);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 3, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        verifyTopicPartitionInBatches(createTestRecordAccumulator.drain(this.metadataCache, new HashSet(Arrays.asList(this.node1, this.node2)), (int) length, 0L), this.tp1, this.tp3);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        verifyTopicPartitionInBatches(createTestRecordAccumulator.drain(this.metadataCache, new HashSet(Arrays.asList(this.node1, this.node2)), (int) length, 0L), this.tp2, topicPartition);
        verifyTopicPartitionInBatches(createTestRecordAccumulator.drain(this.metadataCache, new HashSet(Arrays.asList(this.node1, this.node2)), (int) length, 0L), this.tp1, this.tp3);
        createTestRecordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 3, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.mutePartition(topicPartition);
        verifyTopicPartitionInBatches(createTestRecordAccumulator.drain(this.metadataCache, new HashSet(Arrays.asList(this.node1, this.node2)), (int) length, 0L), this.tp2, this.tp3);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.append("test", 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), cluster);
        createTestRecordAccumulator.unmutePartition(topicPartition);
        verifyTopicPartitionInBatches(createTestRecordAccumulator.drain(this.metadataCache, new HashSet(Arrays.asList(this.node1, this.node2)), Integer.MAX_VALUE, 0L), this.tp1, this.tp2, this.tp3, topicPartition);
    }

    private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> map, TopicPartition... topicPartitionArr) {
        Assertions.assertEquals(topicPartitionArr.length, (int) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).count());
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, List<ProducerBatch>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next().getValue().stream().map(producerBatch -> {
                return producerBatch.topicPartition;
            }).collect(Collectors.toList()));
        }
        for (int i = 0; i < topicPartitionArr.length; i++) {
            Assertions.assertEquals(topicPartitionArr[i], arrayList.get(i));
        }
    }

    @Test
    public void testFull() throws Exception {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, Compression.NONE, 10);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.metadataCache.cluster());
            Deque deque = createTestRecordAccumulator.getDeque(this.tp1);
            Assertions.assertEquals(1, deque.size());
            Assertions.assertTrue(((ProducerBatch) deque.peekFirst()).isWritable());
            Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, milliseconds).readyNodes.size(), "No partitions should be ready.");
        }
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.metadataCache.cluster());
        Deque deque2 = createTestRecordAccumulator.getDeque(this.tp1);
        Assertions.assertEquals(2, deque2.size());
        Assertions.assertTrue(((ProducerBatch) deque2.iterator().next()).isWritable());
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
        List list = (List) createTestRecordAccumulator.drain(this.metadataCache, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(Integer.valueOf(this.node1.id()));
        Assertions.assertEquals(1, list.size());
        Iterator it = ((ProducerBatch) list.get(0)).records().records().iterator();
        for (int i2 = 0; i2 < expectedNumAppends; i2++) {
            Record record = (Record) it.next();
            Assertions.assertEquals(ByteBuffer.wrap(this.key), record.key(), "Keys should match");
            Assertions.assertEquals(ByteBuffer.wrap(this.value), record.value(), "Values should match");
        }
        Assertions.assertFalse(it.hasNext(), "No more records");
    }

    @Test
    public void testAppendLargeCompressed() throws Exception {
        testAppendLarge(Compression.gzip().build());
    }

    @Test
    public void testAppendLargeNonCompressed() throws Exception {
        testAppendLarge(Compression.NONE);
    }

    private void testAppendLarge(Compression compression) throws Exception {
        byte[] bArr = new byte[2 * 512];
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(512 + 61, 10240L, compression, 0);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, bArr, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.metadataCache.cluster());
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
        Deque deque = createTestRecordAccumulator.getDeque(this.tp1);
        Assertions.assertEquals(1, deque.size());
        List list = TestUtils.toList(((ProducerBatch) deque.peek()).records().batches());
        Assertions.assertEquals(1, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assertions.assertEquals(0L, mutableRecordBatch.baseOffset());
        List list2 = TestUtils.toList((Iterable) mutableRecordBatch);
        Assertions.assertEquals(1, list2.size());
        Record record = (Record) list2.get(0);
        Assertions.assertEquals(0L, record.offset());
        Assertions.assertEquals(ByteBuffer.wrap(this.key), record.key());
        Assertions.assertEquals(ByteBuffer.wrap(bArr), record.value());
        Assertions.assertEquals(0L, record.timestamp());
    }

    @Test
    public void testAppendLargeOldMessageFormatCompressed() throws Exception {
        testAppendLargeOldMessageFormat(Compression.gzip().build());
    }

    @Test
    public void testAppendLargeOldMessageFormatNonCompressed() throws Exception {
        testAppendLargeOldMessageFormat(Compression.NONE);
    }

    private void testAppendLargeOldMessageFormat(Compression compression) throws Exception {
        byte[] bArr = new byte[2 * 512];
        new ApiVersions().update(this.node1.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(512 + 61, 10240L, compression, 0);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, bArr, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.metadataCache.cluster());
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
        Deque deque = createTestRecordAccumulator.getDeque(this.tp1);
        Assertions.assertEquals(1, deque.size());
        List list = TestUtils.toList(((ProducerBatch) deque.peek()).records().batches());
        Assertions.assertEquals(1, list.size());
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
        Assertions.assertEquals(0L, mutableRecordBatch.baseOffset());
        List list2 = TestUtils.toList((Iterable) mutableRecordBatch);
        Assertions.assertEquals(1, list2.size());
        Record record = (Record) list2.get(0);
        Assertions.assertEquals(0L, record.offset());
        Assertions.assertEquals(ByteBuffer.wrap(this.key), record.key());
        Assertions.assertEquals(ByteBuffer.wrap(bArr), record.value());
        Assertions.assertEquals(0L, record.timestamp());
    }

    @Test
    public void testLinger() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, Compression.NONE, 10);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready");
        this.time.sleep(10L);
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
        List list = (List) createTestRecordAccumulator.drain(this.metadataCache, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(Integer.valueOf(this.node1.id()));
        Assertions.assertEquals(1, list.size());
        Iterator it = ((ProducerBatch) list.get(0)).records().records().iterator();
        Record record = (Record) it.next();
        Assertions.assertEquals(ByteBuffer.wrap(this.key), record.key(), "Keys should match");
        Assertions.assertEquals(ByteBuffer.wrap(this.value), record.value(), "Values should match");
        Assertions.assertFalse(it.hasNext(), "No more records");
    }

    @Test
    public void testPartialDrain() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, Compression.NONE, 10);
        int i = (KafkaChannelTest.MAX_RECEIVE_SIZE / this.msgSize) + 1;
        for (TopicPartition topicPartition : Arrays.asList(this.tp1, this.tp2)) {
            for (int i2 = 0; i2 < i; i2++) {
                createTestRecordAccumulator.append(topicPartition.topic(), topicPartition.partition(), 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
            }
        }
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Partition's leader should be ready");
        Assertions.assertEquals(1, ((List) createTestRecordAccumulator.drain(this.metadataCache, Collections.singleton(this.node1), KafkaChannelTest.MAX_RECEIVE_SIZE, 0L).get(Integer.valueOf(this.node1.id()))).size(), "But due to size bound only one partition should have been retrieved");
    }

    @Test
    public void testStressfulSituation() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1085, 10240L, Compression.NONE, 0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(new Thread(() -> {
                for (int i2 = 0; i2 < 10000; i2++) {
                    try {
                        createTestRecordAccumulator.append("test", i2 % 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        int i2 = 0;
        long milliseconds = this.time.milliseconds();
        while (i2 < 50000) {
            List<ProducerBatch> list = (List) createTestRecordAccumulator.drain(this.metadataCache, createTestRecordAccumulator.ready(this.metadataCache, milliseconds).readyNodes, 5120, 0L).get(Integer.valueOf(this.node1.id()));
            if (list != null) {
                for (ProducerBatch producerBatch : list) {
                    for (Record record : producerBatch.records().records()) {
                        i2++;
                    }
                    createTestRecordAccumulator.deallocate(producerBatch);
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
    }

    @Test
    public void testNextReadyCheckDelay() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, Compression.NONE, 10);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertEquals(0, ready.readyNodes.size(), "No nodes should be ready.");
        Assertions.assertEquals(10, ready.nextReadyCheckDelayMs, "Next check time should be the linger time");
        this.time.sleep(10 / 2);
        for (int i2 = 0; i2 < expectedNumAppends; i2++) {
            createTestRecordAccumulator.append("test", 2, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready2 = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertEquals(0, ready2.readyNodes.size(), "No nodes should be ready.");
        Assertions.assertEquals(10 / 2, ready2.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time");
        for (int i3 = 0; i3 < expectedNumAppends + 1; i3++) {
            createTestRecordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready3 = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(this.node1), ready3.readyNodes, "Node1 should be ready");
        Assertions.assertTrue(ready3.nextReadyCheckDelayMs <= ((long) 10), "Next check time should be defined by node2, at most linger time");
    }

    @Test
    public void testRetryBackoff() throws Exception {
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 1085, Compression.NONE, 134217727, 268435455L, 268435455 * 10, Integer.MAX_VALUE, this.metrics, "producer-metrics", this.time, new ApiVersions(), (TransactionManager) null, new BufferPool(10240L, 1085, this.metrics, this.time, "producer-metrics"));
        long milliseconds = this.time.milliseconds();
        recordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(this.metadataCache, milliseconds + 134217727 + 1);
        Assertions.assertEquals(Collections.singleton(this.node1), ready.readyNodes, "Node1 should be ready");
        Map drain = recordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, milliseconds + 134217727 + 1);
        Assertions.assertEquals(1, drain.size(), "Node1 should be the only ready node.");
        Assertions.assertEquals(1, ((List) drain.get(0)).size(), "Partition 0 should only have one batch drained.");
        long milliseconds2 = this.time.milliseconds();
        recordAccumulator.reenqueue((ProducerBatch) ((List) drain.get(0)).get(0), milliseconds2);
        recordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        RecordAccumulator.ReadyCheckResult ready2 = recordAccumulator.ready(this.metadataCache, milliseconds2 + 134217727 + 1);
        Assertions.assertEquals(Collections.singleton(this.node1), ready2.readyNodes, "Node1 should be ready");
        Map drain2 = recordAccumulator.drain(this.metadataCache, ready2.readyNodes, Integer.MAX_VALUE, milliseconds2 + 134217727 + 1);
        Assertions.assertEquals(1, drain2.size(), "Node1 should be the only ready node.");
        Assertions.assertEquals(1, ((List) drain2.get(0)).size(), "Node1 should only have one batch drained.");
        Assertions.assertEquals(this.tp2, ((ProducerBatch) ((List) drain2.get(0)).get(0)).topicPartition, "Node1 should only have one batch for partition 1.");
        long j = (long) (268435455 * 1.2d);
        RecordAccumulator.ReadyCheckResult ready3 = recordAccumulator.ready(this.metadataCache, milliseconds2 + j + 1);
        Assertions.assertEquals(Collections.singleton(this.node1), ready3.readyNodes, "Node1 should be ready");
        Map drain3 = recordAccumulator.drain(this.metadataCache, ready3.readyNodes, Integer.MAX_VALUE, milliseconds2 + j + 1);
        Assertions.assertEquals(1, drain3.size(), "Node1 should be the only ready node.");
        Assertions.assertEquals(1, ((List) drain3.get(0)).size(), "Node1 should only have one batch drained.");
        Assertions.assertEquals(this.tp1, ((ProducerBatch) ((List) drain3.get(0)).get(0)).topicPartition, "Node1 should only have one batch for partition 0.");
    }

    private Map<Integer, List<ProducerBatch>> drainAndCheckBatchAmount(MetadataSnapshot metadataSnapshot, Node node, RecordAccumulator recordAccumulator, long j, int i) {
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(metadataSnapshot, j);
        if (i <= 0) {
            Assertions.assertEquals(0, ready.readyNodes.size(), "Leader should not be ready");
            Assertions.assertEquals(0, recordAccumulator.drain(metadataSnapshot, ready.readyNodes, Integer.MAX_VALUE, j).size(), "Leader should not be drained.");
            return null;
        }
        Assertions.assertEquals(Collections.singleton(node), ready.readyNodes, "Leader should be ready");
        Map<Integer, List<ProducerBatch>> drain = recordAccumulator.drain(metadataSnapshot, ready.readyNodes, Integer.MAX_VALUE, j);
        Assertions.assertEquals(i, drain.size(), "Leader should be the only ready node.");
        Assertions.assertEquals(i, drain.get(Integer.valueOf(node.id())).size(), "Partition should only have " + i + " batch drained.");
        return drain;
    }

    @Test
    public void testExponentialRetryBackoff() throws Exception {
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 1085, Compression.NONE, 134217727, 100L, 1000L, Integer.MAX_VALUE, this.metrics, "producer-metrics", this.time, new ApiVersions(), (TransactionManager) null, new BufferPool(10240L, 1085, this.metrics, this.time, "producer-metrics"));
        long milliseconds = this.time.milliseconds();
        recordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        ProducerBatch producerBatch = drainAndCheckBatchAmount(this.metadataCache, this.node1, recordAccumulator, milliseconds + 134217727 + 1, 1).get(0).get(0);
        long j = 0;
        int i = 0;
        while (j < 1000 * 0.8d) {
            recordAccumulator.reenqueue(producerBatch, this.time.milliseconds());
            long pow = (long) (100 * Math.pow(2.0d, i) * 0.8d);
            long pow2 = (long) (100 * Math.pow(2.0d, i) * 1.2d);
            j = pow2;
            drainAndCheckBatchAmount(this.metadataCache, this.node1, recordAccumulator, (milliseconds + pow) - 1, 0);
            drainAndCheckBatchAmount(this.metadataCache, this.node1, recordAccumulator, milliseconds + pow2 + 1, 1);
            i++;
        }
    }

    @Test
    public void testExponentialRetryBackoffLeaderChange() throws Exception {
        MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.empty(), (List) null, (List) null, (List) null);
        MetadataResponse.PartitionMetadata partitionMetadata2 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node2.id())), Optional.empty(), (List) null, (List) null, (List) null);
        MetadataResponse.PartitionMetadata partitionMetadata3 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp2, Optional.of(Integer.valueOf(this.node1.id())), Optional.empty(), (List) null, (List) null, (List) null);
        MetadataResponse.PartitionMetadata partitionMetadata4 = new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp3, Optional.of(Integer.valueOf(this.node2.id())), Optional.empty(), (List) null, (List) null, (List) null);
        MetadataSnapshot metadataSnapshot = new MetadataSnapshot((String) null, this.nodes, Arrays.asList(partitionMetadata, partitionMetadata3, partitionMetadata4), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        MetadataSnapshot metadataSnapshot2 = new MetadataSnapshot((String) null, this.nodes, Arrays.asList(partitionMetadata2, partitionMetadata3, partitionMetadata4), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 1085, Compression.NONE, 134217727, 100L, 1000L, Integer.MAX_VALUE, this.metrics, "producer-metrics", this.time, new ApiVersions(), (TransactionManager) null, new BufferPool(10240L, 1085, this.metrics, this.time, "producer-metrics"));
        long milliseconds = this.time.milliseconds();
        recordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        ProducerBatch producerBatch = drainAndCheckBatchAmount(metadataSnapshot, this.node1, recordAccumulator, milliseconds + 134217727 + 1, 1).get(0).get(0);
        recordAccumulator.reenqueue(producerBatch, this.time.milliseconds());
        drainAndCheckBatchAmount(metadataSnapshot, this.node1, recordAccumulator, (milliseconds + ((long) (100 * 0.8d))) - 1, 0);
        drainAndCheckBatchAmount(metadataSnapshot, this.node1, recordAccumulator, milliseconds + ((long) (100 * 1.2d)) + 1, 1);
        recordAccumulator.reenqueue(producerBatch, this.time.milliseconds());
        drainAndCheckBatchAmount(metadataSnapshot, this.node1, recordAccumulator, (milliseconds + ((long) ((100 * 2) * 0.8d))) - 1, 0);
        drainAndCheckBatchAmount(metadataSnapshot, this.node1, recordAccumulator, milliseconds + ((long) (100 * 2 * 1.2d)) + 1, 1);
        recordAccumulator.reenqueue(producerBatch, this.time.milliseconds());
        long pow = (long) (100 * Math.pow(2.0d, 2.0d) * 0.8d);
        long pow2 = (long) (100 * Math.pow(2.0d, 2.0d) * 1.2d);
        drainAndCheckBatchAmount(metadataSnapshot2, this.node2, recordAccumulator, (milliseconds + pow) - 1, 0);
        drainAndCheckBatchAmount(metadataSnapshot2, this.node2, recordAccumulator, milliseconds + pow2 + 1, 1);
        recordAccumulator.reenqueue(producerBatch, this.time.milliseconds());
        drainAndCheckBatchAmount(metadataSnapshot2, this.node2, recordAccumulator, (milliseconds + ((long) ((100 * Math.pow(2.0d, 3.0d)) * 0.8d))) - 1, 0);
        drainAndCheckBatchAmount(metadataSnapshot2, this.node2, recordAccumulator, milliseconds + 1000 + 1, 1);
    }

    @Test
    public void testFlush() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(4157, 65536L, Compression.NONE, Integer.MAX_VALUE);
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append("test", i % 3, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
            Assertions.assertTrue(createTestRecordAccumulator.hasIncomplete());
        }
        Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No nodes should be ready.");
        createTestRecordAccumulator.beginFlush();
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue(createTestRecordAccumulator.hasIncomplete());
        Iterator it = drain.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                createTestRecordAccumulator.deallocate((ProducerBatch) it2.next());
            }
        }
        createTestRecordAccumulator.awaitFlushCompletion();
        Assertions.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assertions.assertFalse(createTestRecordAccumulator.hasIncomplete());
    }

    private void delayedInterrupt(Thread thread, long j) {
        new Thread(() -> {
            Time.SYSTEM.sleep(j);
            thread.interrupt();
        }).start();
    }

    @Test
    public void testAwaitFlushComplete() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(4157, 65536L, Compression.NONE, Integer.MAX_VALUE);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        createTestRecordAccumulator.beginFlush();
        Assertions.assertTrue(createTestRecordAccumulator.flushInProgress());
        delayedInterrupt(Thread.currentThread(), 1000L);
        try {
            createTestRecordAccumulator.awaitFlushCompletion();
            Assertions.fail("awaitFlushCompletion should throw InterruptException");
        } catch (InterruptedException e) {
            Assertions.assertFalse(createTestRecordAccumulator.flushInProgress(), "flushInProgress count should be decremented even if thread is interrupted");
        }
    }

    @Test
    public void testAbortIncompleteBatches() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(189, 65536L, Compression.NONE, Integer.MAX_VALUE);
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append("test", i % 3, 0L, this.key, this.value, (Header[]) null, new RecordAccumulator.AppendCallbacks() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.1TestCallback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    Assertions.assertEquals("Producer is closed forcefully.", exc.getMessage());
                    atomicInteger.incrementAndGet();
                }

                public void setPartition(int i2) {
                }
            }, 1000L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertFalse(ready.readyNodes.isEmpty());
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue(createTestRecordAccumulator.hasUndrained());
        Assertions.assertTrue(createTestRecordAccumulator.hasIncomplete());
        int i2 = 0;
        Iterator it = drain.entrySet().iterator();
        while (it.hasNext()) {
            for (ProducerBatch producerBatch : (List) ((Map.Entry) it.next()).getValue()) {
                Assertions.assertTrue(producerBatch.isClosed());
                Assertions.assertFalse(producerBatch.produceFuture.completed());
                i2 += producerBatch.recordCount;
            }
        }
        Assertions.assertTrue(i2 > 0 && i2 < 100);
        createTestRecordAccumulator.abortIncompleteBatches();
        Assertions.assertEquals(100, atomicInteger.get());
        Assertions.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assertions.assertFalse(createTestRecordAccumulator.hasIncomplete());
    }

    @Test
    public void testAbortUnsentBatches() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(189, 65536L, Compression.NONE, Integer.MAX_VALUE);
        final KafkaException kafkaException = new KafkaException();
        for (int i = 0; i < 100; i++) {
            createTestRecordAccumulator.append("test", i % 3, 0L, this.key, this.value, (Header[]) null, new RecordAccumulator.AppendCallbacks() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.2TestCallback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    Assertions.assertEquals(kafkaException, exc);
                    atomicInteger.incrementAndGet();
                }

                public void setPartition(int i2) {
                }
            }, 1000L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertFalse(ready.readyNodes.isEmpty());
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue(createTestRecordAccumulator.hasUndrained());
        Assertions.assertTrue(createTestRecordAccumulator.hasIncomplete());
        createTestRecordAccumulator.abortUndrainedBatches(kafkaException);
        int i2 = 0;
        Iterator it = drain.entrySet().iterator();
        while (it.hasNext()) {
            for (ProducerBatch producerBatch : (List) ((Map.Entry) it.next()).getValue()) {
                Assertions.assertTrue(producerBatch.isClosed());
                Assertions.assertFalse(producerBatch.produceFuture.completed());
                i2 += producerBatch.recordCount;
            }
        }
        Assertions.assertTrue(i2 > 0);
        Assertions.assertTrue(atomicInteger.get() > 0);
        Assertions.assertEquals(100, atomicInteger.get() + i2);
        Assertions.assertFalse(createTestRecordAccumulator.hasUndrained());
        Assertions.assertTrue(createTestRecordAccumulator.hasIncomplete());
    }

    private void doExpireBatchSingle(int i) throws InterruptedException {
        List<Boolean> asList = Arrays.asList(false, true);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(i, 1025 + 61, 10 * 1025, Compression.NONE, 300);
        for (Boolean bool : asList) {
            if (this.time.milliseconds() < System.currentTimeMillis()) {
                this.time.setCurrentTimeMs(System.currentTimeMillis());
            }
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
            Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partition should be ready.");
            this.time.sleep(300);
            Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
            Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not expire when just linger has passed");
            if (bool.booleanValue()) {
                createTestRecordAccumulator.mutePartition(this.tp1);
            } else {
                createTestRecordAccumulator.unmutePartition(this.tp1);
            }
            this.time.sleep(i - 300);
            Assertions.assertEquals(1, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch may expire when the partition is muted");
            Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
        }
    }

    @Test
    public void testExpiredBatchSingle() throws InterruptedException {
        doExpireBatchSingle(3200);
    }

    @Test
    public void testExpiredBatchSingleMaxValue() throws InterruptedException {
        doExpireBatchSingle(Integer.MAX_VALUE);
    }

    @Test
    public void testExpiredBatches() throws InterruptedException {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(3200, 1025 + 61, 10 * 1025, Compression.NONE, 30);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
            Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
        }
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
        Set set = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes;
        Assertions.assertEquals(Collections.singleton(this.node1), set, "Our partition's leader should be ready");
        this.time.sleep(3200 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(2, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batches will be muted no matter if the partition is muted or not");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "All batches should have been expired earlier");
        Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
        this.time.sleep(30);
        Assertions.assertEquals(Collections.singleton(this.node1), set, "Our partition's leader should be ready");
        this.time.sleep(60 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not be expired when metadata is still available and partition is muted");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "All batches should have been expired");
        Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
        this.time.sleep(30);
        Set set2 = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes;
        Assertions.assertEquals(Collections.singleton(this.node1), set2, "Our partition's leader should be ready");
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, set2, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals(((List) drain.get(Integer.valueOf(this.node1.id()))).size(), 1, "There should be only one batch.");
        this.time.sleep(1000L);
        createTestRecordAccumulator.reenqueue((ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0), this.time.milliseconds());
        this.time.sleep(60 + 100);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not be expired.");
        this.time.sleep(1L);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not be expired when the partition is muted");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "All batches should have been expired.");
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
        this.time.sleep(30);
        Assertions.assertEquals(Collections.singleton(this.node1), createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, "Our partition's leader should be ready");
        this.time.sleep(60 + 1);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not be expired when the partition is muted");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "The batch should not be expired when the partition is muted");
        this.time.sleep(100L);
        Assertions.assertEquals(0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "All batches should have been expired earlier");
        Assertions.assertEquals(1, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
    }

    @Test
    public void testMutedPartitions() throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, Compression.NONE, 10);
        int expectedNumAppends = expectedNumAppends(1025);
        for (int i = 0; i < expectedNumAppends; i++) {
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
            Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, milliseconds).readyNodes.size(), "No partitions should be ready.");
        }
        this.time.sleep(2000L);
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(0, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.size(), "No node should be ready");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertFalse(ready.readyNodes.isEmpty(), "The batch should be ready");
        createTestRecordAccumulator.mutePartition(this.tp1);
        Assertions.assertEquals(0, ((List) createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).get(Integer.valueOf(this.node1.id()))).size(), "No batch should have been drained");
        createTestRecordAccumulator.unmutePartition(this.tp1);
        Assertions.assertFalse(((List) createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).get(Integer.valueOf(this.node1.id()))).isEmpty(), "The batch should have been drained.");
    }

    @Test
    public void testRecordsDrainedWhenTransactionCompleting() throws Exception {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(transactionManager, 3200, 1025, 10 * 1025, Compression.NONE, 10);
        Mockito.when(transactionManager.producerIdAndEpoch()).thenReturn(new ProducerIdAndEpoch(12345L, (short) 5));
        Mockito.when(Boolean.valueOf(transactionManager.isSendToPartitionAllowed(this.tp1))).thenReturn(true);
        Mockito.when(Boolean.valueOf(transactionManager.transactionContainsPartition(this.tp1))).thenReturn(true);
        Mockito.when(Integer.valueOf(transactionManager.firstInFlightSequence(this.tp1))).thenReturn(0);
        Mockito.when(Boolean.valueOf(transactionManager.isCompleting())).thenReturn(false);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        Assertions.assertTrue(createTestRecordAccumulator.hasUndrained());
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertEquals(0, ready.readyNodes.size());
        Assertions.assertEquals(0, createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).size());
        Mockito.when(Boolean.valueOf(transactionManager.isCompleting())).thenReturn(true);
        RecordAccumulator.ReadyCheckResult ready2 = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertEquals(1, ready2.readyNodes.size());
        Node node = (Node) ready2.readyNodes.iterator().next();
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, ready2.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(Integer.valueOf(node.id())), drain.keySet());
        Assertions.assertEquals(1, ((List) drain.get(Integer.valueOf(node.id()))).size());
    }

    @Test
    public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
        long milliseconds = this.time.milliseconds();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(KafkaChannelTest.MAX_RECEIVE_SIZE, 10240L, Compression.gzip().build(), 10);
        ProducerBatch producerBatch = new ProducerBatch(this.tp1, MemoryRecords.builder(ByteBuffer.allocate(4096), Compression.NONE, TimestampType.CREATE_TIME, 0L), milliseconds, true);
        byte[] bArr = new byte[KafkaChannelTest.MAX_RECEIVE_SIZE];
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Callback callback = (recordMetadata, exc) -> {
            atomicInteger.incrementAndGet();
        };
        FutureRecordMetadata tryAppend = producerBatch.tryAppend(milliseconds, (byte[]) null, bArr, Record.EMPTY_HEADERS, callback, milliseconds);
        FutureRecordMetadata tryAppend2 = producerBatch.tryAppend(milliseconds, (byte[]) null, bArr, Record.EMPTY_HEADERS, callback, milliseconds);
        Assertions.assertNotNull(tryAppend);
        Assertions.assertNotNull(tryAppend2);
        producerBatch.close();
        createTestRecordAccumulator.reenqueue(producerBatch, milliseconds);
        this.time.sleep(121L);
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertFalse(ready.readyNodes.isEmpty(), "The batch should be ready");
        Map drain = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals(1, drain.size(), "Only node1 should be drained");
        Assertions.assertEquals(1, ((List) drain.get(Integer.valueOf(this.node1.id()))).size(), "Only one batch should be drained");
        createTestRecordAccumulator.splitAndReenqueue((ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0));
        this.time.sleep(101L);
        Map drain2 = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertFalse(drain2.isEmpty());
        Assertions.assertFalse(((List) drain2.get(Integer.valueOf(this.node1.id()))).isEmpty());
        ((ProducerBatch) ((List) drain2.get(Integer.valueOf(this.node1.id()))).get(0)).complete(atomicInteger.get(), 100L);
        Assertions.assertEquals(1, atomicInteger.get(), "The first message should have been acked.");
        Assertions.assertTrue(tryAppend.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) tryAppend.get()).offset());
        Map drain3 = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertFalse(drain3.isEmpty());
        Assertions.assertFalse(((List) drain3.get(Integer.valueOf(this.node1.id()))).isEmpty());
        ((ProducerBatch) ((List) drain3.get(Integer.valueOf(this.node1.id()))).get(0)).complete(atomicInteger.get(), 100L);
        Assertions.assertEquals(2, atomicInteger.get(), "Both message should have been acked.");
        Assertions.assertTrue(tryAppend2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) tryAppend2.get()).offset());
    }

    @Test
    public void testSplitBatchOffAccumulator() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        CompressionRatioEstimator.setEstimation(this.tp1.topic(), CompressionType.GZIP, 0.1f);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(KafkaChannelTest.MAX_RECEIVE_SIZE, 3072L, Compression.gzip().build(), 0);
        int prepareSplitBatches = prepareSplitBatches(createTestRecordAccumulator, currentTimeMillis, 100, 20);
        Assertions.assertTrue(prepareSplitBatches > 0, "There should be some split batches");
        RecordAccumulator.ReadyCheckResult ready = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        for (int i = 0; i < prepareSplitBatches; i++) {
            Map drain = createTestRecordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
            Assertions.assertFalse(drain.isEmpty());
            Assertions.assertFalse(((List) drain.get(Integer.valueOf(this.node1.id()))).isEmpty());
        }
        Assertions.assertTrue(createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes.isEmpty(), "All the batches should have been drained.");
        Assertions.assertEquals(3072L, createTestRecordAccumulator.bufferPoolAvailableMemory(), "The split batches should be allocated off the accumulator");
    }

    @Test
    public void testSplitFrequency() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        Random random = new Random();
        random.setSeed(currentTimeMillis);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(KafkaChannelTest.MAX_RECEIVE_SIZE, 3072L, Compression.gzip().build(), 10);
        int i = 1;
        while (i < 100) {
            int i2 = 0;
            int i3 = 0;
            CompressionRatioEstimator.resetEstimation("test");
            for (int i4 = 0; i4 < 1000; i4++) {
                createTestRecordAccumulator.append("test", 0, 0L, (byte[]) null, random.nextInt(100) < i ? bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
                BatchDrainedResult completeOrSplitBatches = completeOrSplitBatches(createTestRecordAccumulator, KafkaChannelTest.MAX_RECEIVE_SIZE);
                i2 += completeOrSplitBatches.numSplit;
                i3 += completeOrSplitBatches.numBatches;
            }
            this.time.sleep(10L);
            BatchDrainedResult completeOrSplitBatches2 = completeOrSplitBatches(createTestRecordAccumulator, KafkaChannelTest.MAX_RECEIVE_SIZE);
            int i5 = i2 + completeOrSplitBatches2.numSplit;
            int i6 = i3 + completeOrSplitBatches2.numBatches;
            Assertions.assertTrue(((double) i5) / ((double) i6) < 0.10000000149011612d, String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. Random seed is " + currentTimeMillis, Integer.valueOf(i6), Integer.valueOf(i5)));
            i++;
        }
    }

    @Test
    public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, Compression.NONE, 500);
        createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        Assertions.assertTrue(createTestRecordAccumulator.drain(this.metadataCache, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).isEmpty());
        this.time.sleep(500 + 1);
        Assertions.assertEquals(1, createTestRecordAccumulator.drain(this.metadataCache, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).size(), "A batch did not drain after linger");
        createTestRecordAccumulator.append("test", 1, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
        this.time.sleep(500 * 4);
        Assertions.assertEquals(1, createTestRecordAccumulator.drain(this.metadataCache, createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).size(), "A batch did not drain after linger");
    }

    @Test
    public void testExpiredBatchesRetry() throws InterruptedException {
        List<Boolean> asList = Arrays.asList(false, true);
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(1025 + 61, 10 * 1025, Compression.NONE, 3000);
        for (Boolean bool : asList) {
            createTestRecordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
            this.time.sleep(3000);
            Set set = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes;
            Assertions.assertEquals(Collections.singleton(this.node1), set, "Our partition's leader should be ready");
            Map drain = createTestRecordAccumulator.drain(this.metadataCache, set, Integer.MAX_VALUE, this.time.milliseconds());
            Assertions.assertEquals(1, ((List) drain.get(Integer.valueOf(this.node1.id()))).size(), "There should be only one batch.");
            this.time.sleep(1000);
            createTestRecordAccumulator.reenqueue((ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0), this.time.milliseconds());
            if (bool.booleanValue()) {
                createTestRecordAccumulator.mutePartition(this.tp1);
            } else {
                createTestRecordAccumulator.unmutePartition(this.tp1);
            }
            this.time.sleep(3200 - 1000);
            createTestRecordAccumulator.drain(this.metadataCache, Collections.singleton(this.node1), Integer.MAX_VALUE, this.time.milliseconds());
            Assertions.assertEquals(bool.booleanValue() ? 1 : 0, createTestRecordAccumulator.expiredBatches(this.time.milliseconds()).size(), "RecordAccumulator has expired batches if the partition is not muted");
        }
    }

    @Test
    public void testUniformBuiltInPartitioner() throws Exception {
        this.mockRandom = new AtomicInteger();
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(KafkaChannelTest.MAX_RECEIVE_SIZE, 1048576L, Compression.NONE, 0);
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        RecordAccumulator.AppendCallbacks appendCallbacks = new RecordAccumulator.AppendCallbacks() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.1
            public void setPartition(int i) {
                atomicInteger.set(i);
            }

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            }
        };
        Cluster cluster = new Cluster((String) null, Arrays.asList(this.node1, this.node2), Arrays.asList(MetadataResponse.toPartitionInfo(this.partMetadata1, this.nodes), MetadataResponse.toPartitionInfo(this.partMetadata2, this.nodes), MetadataResponse.toPartitionInfo(this.partMetadata3, this.nodes)), Collections.emptySet(), Collections.emptySet());
        createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, this.value, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), cluster);
        Assertions.assertEquals(0, atomicInteger.get());
        Assertions.assertEquals(1, this.mockRandom.get());
        byte[] bArr = new byte[KafkaChannelTest.MAX_RECEIVE_SIZE];
        createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), cluster);
        Assertions.assertEquals(0, atomicInteger.get());
        Assertions.assertEquals(1, this.mockRandom.get());
        createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), cluster);
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(2, this.mockRandom.get());
        createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), cluster);
        Assertions.assertEquals(2, atomicInteger.get());
        Assertions.assertEquals(3, this.mockRandom.get());
        createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), cluster);
        Assertions.assertEquals(0, atomicInteger.get());
        Assertions.assertEquals(4, this.mockRandom.get());
    }

    @Test
    public void testAdaptiveBuiltInPartitioner() throws Exception {
        this.mockRandom = new AtomicInteger();
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 128, Compression.NONE, 0, 0L, 0L, 3200, new RecordAccumulator.PartitionerConfig(true, 100L), this.metrics, "producer-metrics", this.time, new ApiVersions(), null, new BufferPool(1048576L, 128, this.metrics, this.time, "producer-internal-metrics")) { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.2
            BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String str, int i) {
                return new SequentialPartitioner(logContext, str, i);
            }
        };
        byte[] bArr = new byte[128];
        int[] iArr = {1, 7, 2};
        int[] iArr2 = new int[iArr.length];
        for (int i = 0; i < iArr.length; i++) {
            iArr2[i] = 8 - iArr[i];
            int i2 = iArr[i];
            while (true) {
                int i3 = i2;
                i2--;
                if (i3 > 0) {
                    recordAccumulator.append("test", i, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
                }
            }
            Assertions.assertEquals(iArr[i], recordAccumulator.getDeque(new TopicPartition("test", i)).size());
        }
        recordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        final AtomicInteger atomicInteger = new AtomicInteger(-1);
        RecordAccumulator.AppendCallbacks appendCallbacks = new RecordAccumulator.AppendCallbacks() { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.3
            public void setPartition(int i4) {
                atomicInteger.set(i4);
            }

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            }
        };
        recordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), this.cluster);
        int loadStatsRangeEnd = recordAccumulator.getBuiltInPartitioner("test").loadStatsRangeEnd() * 2;
        int[] iArr3 = new int[iArr.length];
        for (int i4 = 0; i4 < loadStatsRangeEnd; i4++) {
            recordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), this.cluster);
            int i5 = atomicInteger.get();
            iArr3[i5] = iArr3[i5] + 1;
        }
        for (int i6 = 0; i6 < iArr3.length; i6++) {
            Assertions.assertEquals(iArr2[i6] * 2, iArr3[i6], "Partition " + i6 + " was chosen " + iArr3[i6] + " times");
        }
        recordAccumulator.updateNodeLatencyStats(0, this.time.milliseconds() - 200, true);
        recordAccumulator.updateNodeLatencyStats(0, this.time.milliseconds(), false);
        recordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        recordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), this.cluster);
        int i7 = 10;
        while (true) {
            int i8 = i7;
            i7--;
            if (i8 <= 0) {
                return;
            }
            recordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, appendCallbacks, 1000L, this.time.milliseconds(), this.cluster);
            Assertions.assertEquals(2, atomicInteger.get());
        }
    }

    @Test
    public void testBuiltInPartitionerFractionalBatches() throws Exception {
        RecordAccumulator createTestRecordAccumulator = createTestRecordAccumulator(512, 1048576L, Compression.NONE, 10);
        byte[] bArr = new byte[32];
        int i = 10;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0) {
                return;
            }
            int i3 = ((512 * 2) / 3) / 32;
            while (true) {
                int i4 = i3;
                i3--;
                if (i4 <= 0) {
                    break;
                } else {
                    createTestRecordAccumulator.append("test", -1, 0L, (byte[]) null, bArr, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, this.time.milliseconds(), this.cluster);
                }
            }
            this.time.sleep(10L);
            Set set = createTestRecordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes;
            Assertions.assertEquals(1, set.size(), "Should have 1 leader ready");
            List list = (List) ((Map.Entry) createTestRecordAccumulator.drain(this.metadataCache, set, Integer.MAX_VALUE, 0L).entrySet().iterator().next()).getValue();
            Assertions.assertEquals(1, list.size(), "Should have 1 batch ready");
            int sizeInBytes = ((ProducerBatch) list.get(0)).records().sizeInBytes();
            Assertions.assertTrue(sizeInBytes > 512 / 2, "Batch must be greater than half batch.size");
            Assertions.assertTrue(sizeInBytes < 512, "Batch must be less than batch.size");
        }
    }

    @Test
    public void testReadyAndDrainWhenABatchIsBeingRetried() throws InterruptedException {
        MetadataSnapshot metadataSnapshot = new MetadataSnapshot((String) null, this.nodes, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.of(100), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 10, Compression.NONE, 10, 100, 1000, Integer.MAX_VALUE, this.metrics, "producer-metrics", this.time, new ApiVersions(), (TransactionManager) null, new BufferPool(10240L, 10, this.metrics, this.time, "producer-metrics"));
        long milliseconds = this.time.milliseconds();
        recordAccumulator.append("test", 0, 0L, this.key, this.value, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, milliseconds, this.cluster);
        long j = milliseconds + 10 + 1;
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(metadataSnapshot, j);
        Assertions.assertTrue(ready.readyNodes.contains(this.node1), "Node1 is ready");
        Map drain = recordAccumulator.drain(metadataSnapshot, ready.readyNodes, 999999, j);
        Assertions.assertTrue(drain.containsKey(Integer.valueOf(this.node1.id())) && ((List) drain.get(Integer.valueOf(this.node1.id()))).size() == 1, "Node1 has 1 batch ready & drained");
        ProducerBatch producerBatch = (ProducerBatch) ((List) drain.get(Integer.valueOf(this.node1.id()))).get(0);
        Assertions.assertEquals(OptionalInt.of(100), producerBatch.currentLeaderEpoch());
        Assertions.assertEquals(0, producerBatch.attemptsWhenLeaderLastChanged());
        recordAccumulator.reenqueue(producerBatch, j);
        long j2 = j + 1;
        Assertions.assertFalse(recordAccumulator.ready(metadataSnapshot, j2).readyNodes.contains(this.node1), "Node1 is not ready");
        Map drain2 = recordAccumulator.drain(metadataSnapshot, new HashSet(Collections.singletonList(this.node1)), 999999, j2);
        Assertions.assertTrue(drain2.containsKey(Integer.valueOf(this.node1.id())) && ((List) drain2.get(Integer.valueOf(this.node1.id()))).isEmpty(), "No batches ready to be drained on Node1");
        long j3 = j2 + 1;
        int i = 100 + 1;
        MetadataSnapshot metadataSnapshot2 = new MetadataSnapshot((String) null, this.nodes, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.of(Integer.valueOf(i)), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        RecordAccumulator.ReadyCheckResult ready2 = recordAccumulator.ready(metadataSnapshot2, j3);
        Assertions.assertTrue(ready2.readyNodes.contains(this.node1), "Node1 is ready");
        Map drain3 = recordAccumulator.drain(metadataSnapshot2, ready2.readyNodes, 999999, j3);
        Assertions.assertTrue(drain3.containsKey(Integer.valueOf(this.node1.id())) && ((List) drain3.get(Integer.valueOf(this.node1.id()))).size() == 1, "Node1 has 1 batch ready & drained");
        ProducerBatch producerBatch2 = (ProducerBatch) ((List) drain3.get(Integer.valueOf(this.node1.id()))).get(0);
        Assertions.assertEquals(OptionalInt.of(i), producerBatch2.currentLeaderEpoch());
        Assertions.assertEquals(1, producerBatch2.attemptsWhenLeaderLastChanged());
        recordAccumulator.reenqueue(producerBatch2, j3);
        long j4 = j3 + (2 * 1000);
        MetadataSnapshot metadataSnapshot3 = new MetadataSnapshot((String) null, this.nodes, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.of(Integer.valueOf(i)), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        RecordAccumulator.ReadyCheckResult ready3 = recordAccumulator.ready(metadataSnapshot3, j4);
        Assertions.assertTrue(ready3.readyNodes.contains(this.node1), "Node1 is ready");
        Map drain4 = recordAccumulator.drain(metadataSnapshot3, ready3.readyNodes, 999999, j4);
        Assertions.assertTrue(drain4.containsKey(Integer.valueOf(this.node1.id())) && ((List) drain4.get(Integer.valueOf(this.node1.id()))).size() == 1, "Node1 has 1 batch ready & drained");
        ProducerBatch producerBatch3 = (ProducerBatch) ((List) drain4.get(Integer.valueOf(this.node1.id()))).get(0);
        Assertions.assertEquals(OptionalInt.of(i), producerBatch3.currentLeaderEpoch());
        Assertions.assertEquals(1, producerBatch3.attemptsWhenLeaderLastChanged());
        recordAccumulator.reenqueue(producerBatch3, j4);
        long j5 = j4 + (2 * 1000);
        int i2 = i + 1;
        MetadataSnapshot metadataSnapshot4 = new MetadataSnapshot((String) null, this.nodes, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.of(Integer.valueOf(i2)), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        RecordAccumulator.ReadyCheckResult ready4 = recordAccumulator.ready(metadataSnapshot4, j5);
        Assertions.assertTrue(ready4.readyNodes.contains(this.node1), "Node1 is ready");
        Map drain5 = recordAccumulator.drain(metadataSnapshot4, ready4.readyNodes, 999999, j5);
        Assertions.assertTrue(drain5.containsKey(Integer.valueOf(this.node1.id())) && ((List) drain5.get(Integer.valueOf(this.node1.id()))).size() == 1, "Node1 has 1 batch ready & drained");
        ProducerBatch producerBatch4 = (ProducerBatch) ((List) drain5.get(Integer.valueOf(this.node1.id()))).get(0);
        Assertions.assertEquals(OptionalInt.of(i2), producerBatch4.currentLeaderEpoch());
        Assertions.assertEquals(3, producerBatch4.attemptsWhenLeaderLastChanged());
        recordAccumulator.reenqueue(producerBatch4, j5);
    }

    @Test
    public void testDrainWithANodeThatDoesntHostAnyPartitions() {
        Assertions.assertTrue(((List) createTestRecordAccumulator(10, 10240L, Compression.NONE, 10).drain(new MetadataSnapshot((String) null, this.nodes, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(this.node1.id())), Optional.empty(), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap()), new HashSet(Collections.singletonList(this.node2)), 999999, this.time.milliseconds()).get(Integer.valueOf(this.node2.id()))).isEmpty());
    }

    private int prepareSplitBatches(RecordAccumulator recordAccumulator, long j, int i, int i2) throws InterruptedException {
        Random random = new Random();
        random.setSeed(j);
        CompressionRatioEstimator.setEstimation(this.tp1.topic(), CompressionType.GZIP, 0.1f);
        for (int i3 = 0; i3 < i2; i3++) {
            recordAccumulator.append("test", 0, 0L, (byte[]) null, bytesWithPoorCompression(random, i), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 0L, this.time.milliseconds(), this.cluster);
        }
        RecordAccumulator.ReadyCheckResult ready = recordAccumulator.ready(this.metadataCache, this.time.milliseconds());
        Assertions.assertFalse(ready.readyNodes.isEmpty());
        Map drain = recordAccumulator.drain(this.metadataCache, ready.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals(1, drain.size());
        Assertions.assertEquals(1, ((List) drain.values().iterator().next()).size());
        ProducerBatch producerBatch = (ProducerBatch) ((List) drain.values().iterator().next()).get(0);
        int splitAndReenqueue = recordAccumulator.splitAndReenqueue(producerBatch);
        recordAccumulator.deallocate(producerBatch);
        return splitAndReenqueue;
    }

    private BatchDrainedResult completeOrSplitBatches(RecordAccumulator recordAccumulator, int i) {
        boolean z;
        int i2 = 0;
        int i3 = 0;
        do {
            z = false;
            Iterator it = recordAccumulator.drain(this.metadataCache, recordAccumulator.ready(this.metadataCache, this.time.milliseconds()).readyNodes, Integer.MAX_VALUE, this.time.milliseconds()).values().iterator();
            while (it.hasNext()) {
                for (ProducerBatch producerBatch : (List) it.next()) {
                    z = true;
                    i3++;
                    if (producerBatch.estimatedSizeInBytes() > i + 61) {
                        recordAccumulator.splitAndReenqueue(producerBatch);
                        i2++;
                    } else {
                        producerBatch.complete(0L, 0L);
                    }
                    recordAccumulator.deallocate(producerBatch);
                }
            }
        } while (z);
        return new BatchDrainedResult(i2, i3);
    }

    private byte[] bytesWithGoodCompression(Random random) {
        byte[] bArr = new byte[100];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        while (wrap.remaining() > 0) {
            wrap.putInt(random.nextInt(1000));
        }
        return bArr;
    }

    private byte[] bytesWithPoorCompression(Random random, int i) {
        byte[] bArr = new byte[i];
        random.nextBytes(bArr);
        return bArr;
    }

    private int expectedNumAppends(int i) {
        int i2 = 0;
        int i3 = 0;
        while (true) {
            int sizeInBytes = DefaultRecord.sizeInBytes(i3, 0L, this.key.length, this.value.length, Record.EMPTY_HEADERS);
            if (i2 + sizeInBytes > i) {
                return i3;
            }
            i3++;
            i2 += sizeInBytes;
        }
    }

    private int expectedNumAppendsNoKey(int i) {
        int i2 = 0;
        int i3 = 0;
        while (true) {
            int sizeInBytes = DefaultRecord.sizeInBytes(i3, 0L, 0, this.value.length, Record.EMPTY_HEADERS);
            if (i2 + sizeInBytes > i) {
                return i3;
            }
            i3++;
            i2 += sizeInBytes;
        }
    }

    private RecordAccumulator createTestRecordAccumulator(int i, long j, Compression compression, int i2) {
        return createTestRecordAccumulator(3200, i, j, compression, i2);
    }

    private RecordAccumulator createTestRecordAccumulator(int i, int i2, long j, Compression compression, int i3) {
        return createTestRecordAccumulator(null, i, i2, j, compression, i3);
    }

    private RecordAccumulator createTestRecordAccumulator(TransactionManager transactionManager, int i, int i2, long j, Compression compression, int i3) {
        return new RecordAccumulator(this.logContext, i2, compression, i3, 100L, 1000L, i, this.metrics, "producer-metrics", this.time, new ApiVersions(), transactionManager, new BufferPool(j, i2, this.metrics, this.time, "producer-metrics")) { // from class: org.apache.kafka.clients.producer.internals.RecordAccumulatorTest.4
            BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String str, int i4) {
                return new SequentialPartitioner(logContext, str, i4);
            }
        };
    }
}
