/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerTestUtils;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final String CLIENT_ID = "clientId";
    private static final double EPS = 1.0E-4;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 5000;
    private static final long RETRY_BACKOFF_MS = 50L;
    private static final int DELIVERY_TIMEOUT_MS = 1500;
    private static final long TOPIC_IDLE_MS = 60000L;
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private int batchSize = 16384;
    private ProducerMetadata metadata = new ProducerMetadata(0L, Long.MAX_VALUE, 60000L, new LogContext(), new ClusterResourceListeners(), (Time)this.time);
    private MockClient client = new MockClient((Time)this.time, (Metadata)this.metadata);
    private ApiVersions apiVersions = new ApiVersions();
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private SenderMetricsRegistry senderMetricsRegistry = null;
    private final LogContext logContext = new LogContext();

    @BeforeEach
    public void setup() {
        this.setupWithTransactionState(null);
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceRequest request) {
        HashMap partitionRecords = new HashMap();
        request.data().topicData().forEach(tpData -> tpData.partitionData().forEach(p -> {
            TopicPartition tp = new TopicPartition(tpData.name(), p.index());
            partitionRecords.put(tp, (MemoryRecords)p.records());
        }));
        return Collections.unmodifiableMap(partitionRecords);
    }

    @Test
    public void testSimple() throws Exception {
        long offset = 0L;
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"We should have a single produce request in flight.");
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount(), (String)"All requests completed.");
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone(), (String)"Request should be completed");
        Assertions.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testMessageFormatDownConversion() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.apiVersions.update("0", NodeApiVersions.create((short)ApiKeys.PRODUCE.id, (short)0, (short)2));
        this.client.prepareResponse(body -> {
            ProduceRequest request = (ProduceRequest)body;
            if (request.version() != 2) {
                return false;
            }
            MemoryRecords records = SenderTest.partitionRecords(request).get(this.tp0);
            return records != null && records.sizeInBytes() > 0 && records.hasMatchingMagic((byte)1);
        }, (AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone(), (String)"Request should be completed");
        Assertions.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testDownConversionForMismatchedMagicValues() throws Exception {
        long offset = 0L;
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata future1 = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.apiVersions.update("0", NodeApiVersions.create((short)ApiKeys.PRODUCE.id, (short)0, (short)2));
        FutureRecordMetadata future2 = this.appendToAccumulator(this.tp1, 0L, "key", "value");
        this.apiVersions.update("0", NodeApiVersions.create());
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(Errors.NONE, offset, -1L, 100L);
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        partResp.put(this.tp0, resp);
        partResp.put(this.tp1, resp);
        ProduceResponse produceResponse = new ProduceResponse(partResp, 0);
        this.client.prepareResponse(body -> {
            ProduceRequest request = (ProduceRequest)body;
            if (request.version() != 2) {
                return false;
            }
            Map<TopicPartition, MemoryRecords> recordsMap = SenderTest.partitionRecords(request);
            if (recordsMap.size() != 2) {
                return false;
            }
            for (MemoryRecords records : recordsMap.values()) {
                if (records != null && records.sizeInBytes() != 0 && records.hasMatchingMagic((byte)1)) continue;
                return false;
            }
            return true;
        }, (AbstractResponse)produceResponse);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future1.isDone(), (String)"Request should be completed");
        Assertions.assertTrue((boolean)future2.isDone(), (String)"Request should be completed");
    }

    @Test
    public void testQuotaMetrics() {
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor((SenderMetricsRegistry)this.senderMetricsRegistry);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, (Metadata)this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, 10000L, 127000L, (Time)this.time, true, new ApiVersions(), throttleTimeSensor, this.logContext);
        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse((int)400, (ApiMessageType.ListenerType)ApiMessageType.ListenerType.ZK_BROKER);
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic((ProduceRequestData)new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short)1).setTimeoutMs(1000));
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            ProduceResponse response = this.produceResponse(this.tp0, i, Errors.NONE, throttleTimeMs);
            buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)response, ApiKeys.PRODUCE.latestVersion(), request.correlationId());
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
        Assertions.assertEquals((double)250.0, (double)((Double)avgMetric.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)400.0, (double)((Double)maxMetric.metricValue()), (double)1.0E-4);
        client.close();
    }

    @Test
    public void testSenderMetricsTemplates() throws Exception {
        this.metrics.close();
        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
        this.metrics = new Metrics(new MetricConfig().tags(clientTags));
        SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(this.metrics);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, 1, metricsRegistry, (Time)this.time, 5000, 50L, null, this.apiVersions);
        this.appendToAccumulator(this.tp0, 0L, "key", "value");
        sender.runOnce();
        sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        Sender.throttleTimeSensor((SenderMetricsRegistry)metricsRegistry);
        HashSet<MetricNameTemplate> allMetrics = new HashSet<MetricNameTemplate>();
        for (MetricName n : this.metrics.metrics().keySet()) {
            if (n.group().equals("kafka-metrics-count")) continue;
            allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet()));
        }
        TestUtils.checkEquals(allMetrics, new HashSet(metricsRegistry.allTemplates()), "metrics", "templates");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, null, this.apiVersions);
            FutureRecordMetadata future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            sender.runOnce();
            String id = this.client.requests().peek().destination();
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
            Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
            this.client.disconnect(id);
            Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
            Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
            Assertions.assertFalse((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be false");
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
            sender.runOnce();
            sender.runOnce();
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
            long offset = 0L;
            this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
            sender.runOnce();
            Assertions.assertTrue((boolean)future.isDone(), (String)"Request should have retried and completed");
            Assertions.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
            Assertions.assertEquals((int)0, (int)sender.inFlightBatches(this.tp0).size());
            future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
            for (int i = 0; i < maxRetries + 1; ++i) {
                this.client.disconnect(this.client.requests().peek().destination());
                sender.runOnce();
                Assertions.assertEquals((int)0, (int)sender.inFlightBatches(this.tp0).size());
                sender.runOnce();
                sender.runOnce();
                Assertions.assertEquals((int)(i > 0 ? 0 : 1), (int)sender.inFlightBatches(this.tp0).size());
            }
            sender.runOnce();
            this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)NetworkException.class);
            Assertions.assertEquals((int)0, (int)sender.inFlightBatches(this.tp0).size());
        }
        finally {
            m.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSendInOrder() throws Exception {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, null, this.apiVersions);
            MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
            this.client.prepareMetadataUpdate(metadataUpdate1);
            TopicPartition tp2 = new TopicPartition("test", 1);
            this.appendToAccumulator(tp2, 0L, "key1", "value1");
            sender.runOnce();
            sender.runOnce();
            String id = this.client.requests().peek().destination();
            Assertions.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.parseInt(id), "localhost", 0);
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
            Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(tp2).size());
            this.time.sleep(900L);
            this.appendToAccumulator(tp2, 0L, "key2", "value2");
            MetadataResponse metadataUpdate2 = RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
            this.client.prepareMetadataUpdate(metadataUpdate2);
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(tp2).size());
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
            Assertions.assertEquals((int)1, (int)sender.inFlightBatches(tp2).size());
        }
        finally {
            m.close();
        }
    }

    @Test
    public void testAppendInExpiryCallback() throws InterruptedException {
        int messagesPerBatch = 10;
        final AtomicInteger expiryCallbackCount = new AtomicInteger(0);
        final AtomicReference unexpectedException = new AtomicReference();
        final byte[] key = "key".getBytes();
        final byte[] value = "value".getBytes();
        long maxBlockTimeMs = 1000L;
        final Cluster cluster = TestUtils.singletonCluster();
        RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks(){

            public void setPartition(int partition) {
            }

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception instanceof TimeoutException) {
                    expiryCallbackCount.incrementAndGet();
                    try {
                        SenderTest.this.accumulator.append(SenderTest.this.tp1.topic(), SenderTest.this.tp1.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, 1000L, false, SenderTest.this.time.milliseconds(), cluster);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Unexpected interruption", e);
                    }
                } else if (exception != null) {
                    unexpectedException.compareAndSet(null, exception);
                }
            }
        };
        long nowMs = this.time.milliseconds();
        for (int i = 0; i < messagesPerBatch; ++i) {
            this.accumulator.append(this.tp1.topic(), this.tp1.partition(), 0L, key, value, null, callbacks, 1000L, false, nowMs, cluster);
        }
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        Map drainedBatches = this.accumulator.drain(this.metadata.fetch(), Collections.singleton(clusterNode), Integer.MAX_VALUE, this.time.milliseconds());
        this.sender.addToInflightBatches(drainedBatches);
        this.client.disconnect(clusterNode.idString());
        this.client.backoff(clusterNode, 100L);
        this.sender.runOnce();
        Assertions.assertEquals((int)messagesPerBatch, (int)expiryCallbackCount.get(), (String)"Callbacks not invoked for expiry");
        Assertions.assertNull(unexpectedException.get(), (String)"Unexpected exception");
        Assertions.assertNotNull((Object)this.accumulator.getDeque(this.tp1));
        Assertions.assertEquals((int)1, (int)this.accumulator.getDeque(this.tp1).size());
        Assertions.assertEquals((int)messagesPerBatch, (int)((ProducerBatch)this.accumulator.getDeque((TopicPartition)this.tp1).peekFirst()).recordCount);
    }

    @Test
    public void testMetadataTopicExpiry() throws Exception {
        long offset = 0L;
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)this.metadata.containsTopic(this.tp0.topic()), (String)"Topic not added to metadata");
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount(), (String)"Request completed.");
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone(), (String)"Request should be completed");
        Assertions.assertTrue((boolean)this.metadata.containsTopic(this.tp0.topic()), (String)"Topic not retained in metadata list");
        this.time.sleep(60000L);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        Assertions.assertFalse((boolean)this.metadata.containsTopic(this.tp0.topic()), (String)"Unused topic has not been expired");
        future = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)this.metadata.containsTopic(this.tp0.topic()), (String)"Topic not added to metadata");
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, offset + 1L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount(), (String)"Request completed.");
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone(), (String)"Request should be completed");
    }

    @Test
    public void testNodeLatencyStats() throws Exception {
        try (Metrics m = new Metrics();){
            RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42L);
            long totalSize = 0x100000L;
            this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.NONE, 0, 0L, 1500, config, m, "producer-metrics", (Time)this.time, this.apiVersions, null, new BufferPool(totalSize, this.batchSize, m, (Time)this.time, "producer-internal-metrics"));
            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, 1, senderMetrics, (Time)this.time, 5000, 1000L, null, new ApiVersions());
            long time1 = this.time.milliseconds();
            this.appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"We should have a single produce request in flight.");
            RecordAccumulator.NodeLatencyStats stats = this.accumulator.getNodeLatencyStats(Integer.valueOf(0));
            Assertions.assertEquals((long)time1, (long)stats.drainTimeMs);
            Assertions.assertEquals((long)time1, (long)stats.readyTimeMs);
            this.client.throttle(this.metadata.fetch().nodeById(0), 100L);
            this.time.sleep(10L);
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"We should have a single produce request in flight.");
            Assertions.assertEquals((long)time1, (long)stats.drainTimeMs);
            Assertions.assertEquals((long)time1, (long)stats.readyTimeMs);
            long time2 = this.time.milliseconds();
            this.appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"We should have a single produce request in flight.");
            Assertions.assertEquals((long)time1, (long)stats.drainTimeMs);
            Assertions.assertEquals((long)time2, (long)stats.readyTimeMs);
            this.time.sleep(10L);
            time2 = this.time.milliseconds();
            sender.runOnce();
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"We should have a single produce request in flight.");
            Assertions.assertEquals((long)time1, (long)stats.drainTimeMs);
            Assertions.assertEquals((long)time2, (long)stats.readyTimeMs);
            this.time.sleep(100L);
            time2 = this.time.milliseconds();
            sender.runOnce();
            Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount(), (String)"We should have 2 produce requests in flight.");
            Assertions.assertEquals((long)time2, (long)stats.drainTimeMs);
            Assertions.assertEquals((long)time2, (long)stats.readyTimeMs);
        }
    }

    @Test
    public void testInitProducerIdRequest() {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short)0, (short)transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testInitProducerIdWithMaxInFlightOne() throws Exception {
        long producerId = 123456L;
        this.createMockClientWithMaxFlightOneMetadataPending();
        TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest", 60000, 100L, new ApiVersions());
        this.setupWithTransactionState(transactionManager, false, null, false);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        transactionManager.initializeTransactions();
        this.sender.runOnce();
        MetadataResponse metadataUpdate = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        this.client.respond((AbstractResponse)metadataUpdate);
        this.prepareFindCoordinatorResponse(Errors.NONE, "testInitProducerIdWithPendingMetadataRequest");
        this.prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.waitForProducerId(transactionManager, producerIdAndEpoch);
    }

    @Test
    public void testIdempotentInitProducerIdWithMaxInFlightOne() throws Exception {
        long producerId = 123456L;
        this.createMockClientWithMaxFlightOneMetadataPending();
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager, false, null, false);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        MetadataResponse metadataUpdate = RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap());
        this.client.respond((AbstractResponse)metadataUpdate);
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.initProducerIdResponse(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, Errors.NONE));
        this.waitForProducerId(transactionManager, producerIdAndEpoch);
    }

    @Test
    public void testNodeNotReady() {
        long producerId = 123456L;
        this.time = new MockTime(10L);
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady", 60000, 100L, new ApiVersions());
        this.setupWithTransactionState(transactionManager, false, null, true);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        transactionManager.initializeTransactions();
        this.sender.runOnce();
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.client.delayReady(node, 5020L);
        this.prepareFindCoordinatorResponse(Errors.NONE, "testNodeNotReady");
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertNotNull((Object)transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION), (String)"Coordinator not found");
        this.client.throttle(node, 5020L);
        this.prepareFindCoordinatorResponse(Errors.NONE, "Coordinator not found");
        this.prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.waitForProducerId(transactionManager, producerIdAndEpoch);
    }

    @Test
    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.CLUSTER_AUTHORIZATION_FAILED);
        Assertions.assertFalse((boolean)transactionManager.hasProducerId());
        Assertions.assertTrue((boolean)transactionManager.hasError());
        Assertions.assertTrue((boolean)(transactionManager.lastError() instanceof ClusterAuthorizationException));
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCanRetryWithoutIdempotence() throws Exception {
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        String id = this.client.requests().peek().destination();
        Node node = new Node(Integer.parseInt(id), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
        Assertions.assertFalse((boolean)future.isDone());
        this.client.respond(body -> {
            ProduceRequest request = (ProduceRequest)body;
            Assertions.assertFalse((boolean)RequestTestUtils.hasIdempotentRecords(request));
            return true;
        }, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone());
        try {
            future.get();
        }
        catch (Exception e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TopicAuthorizationException));
        }
    }

    @Test
    public void testIdempotenceWithMultipleInflights() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        FutureRecordMetadata request3 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)3, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertFalse((boolean)request3.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
        this.sender.runOnce();
        FutureRecordMetadata request4 = this.appendToAccumulator(this.tp0);
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.NONE, 2L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(2), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request3.isDone());
        Assertions.assertEquals((long)2L, (long)((RecordMetadata)request3.get()).offset());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(3, this.tp0, Errors.NONE, 3L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(3), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request4.isDone());
        Assertions.assertEquals((long)3L, (long)((RecordMetadata)request4.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.MESSAGE_TOO_LARGE, -1L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)RecordTooLargeException.class);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatch() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals((int)1, (int)transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals((int)0, (int)transactionManager.firstInFlightSequence(this.tp0));
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatchWhenThereIsNoBatchInFlight() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((int)0, (int)transactionManager.producerIdAndEpoch().epoch);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 1L, OptionalInt.empty());
        this.sendIdempotentProducerResponse(0, 0, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 1L, OptionalInt.empty());
        this.sendIdempotentProducerResponse(0, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 2L, OptionalInt.of(0));
        this.sendIdempotentProducerResponse(0, 1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 1L, OptionalInt.empty());
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 1L, OptionalInt.of(0));
        Assertions.assertTrue((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.sendIdempotentProducerResponse(1, 0, this.tp0, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)1, 1L, OptionalInt.empty());
        Assertions.assertFalse((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.sendIdempotentProducerResponse(1, 0, this.tp1, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)1, 1L, OptionalInt.of(0));
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatchWhenBatchInFlightFails() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager, false, null, true, 1, 0);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((int)0, (int)transactionManager.producerIdAndEpoch().epoch);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 1L, OptionalInt.empty());
        this.sendIdempotentProducerResponse(0, 0, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 1L, OptionalInt.empty());
        this.sendIdempotentProducerResponse(0, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)0, 2L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 2L, OptionalInt.of(0));
        this.sendIdempotentProducerResponse(0, 1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 1L, OptionalInt.empty());
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 2L, OptionalInt.of(0));
        Assertions.assertTrue((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.sendIdempotentProducerResponse(0, 1, this.tp1, Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 2L, OptionalInt.of(0));
        Assertions.assertTrue((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.sendIdempotentProducerResponse(1, 0, this.tp0, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 1L, OptionalInt.of(0));
        this.sendIdempotentProducerResponse(0, 1, this.tp1, Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)0, 2L, OptionalInt.of(0));
        Assertions.assertTrue((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)1, 1L, OptionalInt.empty());
        Assertions.assertFalse((boolean)transactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        this.sendIdempotentProducerResponse(1, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp1, 343434L, (short)1, 1L, OptionalInt.of(0));
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 2L, OptionalInt.of(0));
        this.sendIdempotentProducerResponse(1, 1, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        this.assertPartitionState(transactionManager, this.tp0, 343434L, (short)1, 2L, OptionalInt.of(1));
    }

    private void assertPartitionState(TransactionManager transactionManager, TopicPartition tp, long expectedProducerId, short expectedProducerEpoch, long expectedSequenceValue, OptionalInt expectedLastAckedSequence) {
        Assertions.assertEquals((long)expectedProducerId, (long)transactionManager.producerIdAndEpoch((TopicPartition)tp).producerId, (String)"Producer Id:");
        Assertions.assertEquals((short)expectedProducerEpoch, (short)transactionManager.producerIdAndEpoch((TopicPartition)tp).epoch, (String)"Producer Epoch:");
        Assertions.assertEquals((long)expectedSequenceValue, (long)transactionManager.sequenceNumber(tp).longValue(), (String)"Seq Number:");
        Assertions.assertEquals((Object)expectedLastAckedSequence, (Object)transactionManager.lastAckedSequence(tp), (String)"Last Acked Seq Number:");
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1));
        this.sender.runOnce();
        Deque queuedBatches = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals((int)1, (int)queuedBatches.size());
        Assertions.assertEquals((int)1, (int)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.NOT_LEADER_OR_FOLLOWER, -1));
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)queuedBatches.size());
        Assertions.assertEquals((int)0, (int)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assertions.assertEquals((int)1, (int)((ProducerBatch)queuedBatches.peekLast()).baseSequence());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, 1L, Errors.NONE, 1));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertFalse((boolean)request1.isDone());
        Deque queuedBatches = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals((int)0, (int)queuedBatches.size());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.REQUEST_TIMED_OUT, -1));
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)queuedBatches.size());
        Assertions.assertEquals((int)0, (int)((ProducerBatch)queuedBatches.peekFirst()).baseSequence());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)queuedBatches.size());
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)request1.get()).offset());
    }

    @Test
    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assertions.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager, false, null);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)2, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.REQUEST_TIMED_OUT, -1L);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(600L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assertions.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        FutureRecordMetadata request3 = this.appendToAccumulator(this.tp0);
        this.time.sleep(20L);
        Assertions.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        Deque batches = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals((int)1, (int)batches.size());
        Assertions.assertFalse((boolean)((ProducerBatch)batches.peekFirst()).hasSequence());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        this.sender.runOnce();
        Assertions.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((int)0, (int)batches.size());
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertFalse((boolean)request3.isDone());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiryOfFirstBatchShouldCauseEpochBumpIfFutureBatchesFail() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(1000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assertions.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        this.appendToAccumulator(this.tp0);
        this.time.sleep(20L);
        Assertions.assertFalse((boolean)request2.isDone());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1L);
        this.sender.runOnce();
        Deque batches = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals((int)2, (int)batches.size());
        this.sender.runOnce();
        Assertions.assertEquals((short)1, (short)transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testUnresolvedSequencesAreNotFatal() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        this.apiVersions.update("0", NodeApiVersions.create((short)ApiKeys.INIT_PRODUCER_ID.id, (short)0, (short)3));
        TransactionManager txnManager = new TransactionManager(this.logContext, "testUnresolvedSeq", 60000, 100L, this.apiVersions);
        this.setupWithTransactionState(txnManager);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(1000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assertions.assertTrue((boolean)txnManager.hasUnresolvedSequence(this.tp0));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)txnManager.hasAbortableError());
    }

    @Test
    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.time.sleep(15000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)request1, (Class<? extends Exception>)TimeoutException.class);
        Assertions.assertTrue((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Deque batches = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals((int)0, (int)batches.size());
        Assertions.assertEquals((long)343434L, (long)transactionManager.producerIdAndEpoch().producerId);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertFalse((boolean)transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, (short)Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        this.appendToAccumulator(this.tp0);
        FutureRecordMetadata successfulResponse = this.appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        this.prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        sender.runOnce();
        Assertions.assertEquals((long)343435L, (long)transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertFalse((boolean)successfulResponse.isDone());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp1, 10L, Errors.NONE, -1));
        sender.runOnce();
        Assertions.assertTrue((boolean)successfulResponse.isDone());
        Assertions.assertEquals((long)10L, (long)((RecordMetadata)successfulResponse.get()).offset());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp1).longValue());
    }

    @Test
    public void testCloseWithProducerIdReset() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, (short)Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 10, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        this.appendToAccumulator(this.tp0);
        this.appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.initiateClose();
        sender.runOnce();
        TestUtils.waitForCondition(() -> {
            this.prepareInitProducerResponse(Errors.NONE, 343435L, (short)1);
            sender.runOnce();
            return !this.accumulator.hasUndrained();
        }, 5000L, "Failed to drain batches");
    }

    @Test
    public void testForceCloseWithProducerIdReset() throws Exception {
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(1L, (short)Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, 10, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata failedResponse = this.appendToAccumulator(this.tp0);
        FutureRecordMetadata successfulResponse = this.appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        sender.forceClose();
        sender.runOnce();
        sender.run();
        Assertions.assertFalse((boolean)this.accumulator.hasUndrained(), (String)"Pending batches are not aborted.");
        Assertions.assertTrue((boolean)successfulResponse.isDone());
    }

    @Test
    public void testBatchesDrainedWithOldProducerIdShouldSucceedOnSubsequentRetry() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata outOfOrderResponse = this.appendToAccumulator(this.tp0);
        FutureRecordMetadata successfulResponse = this.appendToAccumulator(this.tp1);
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        LinkedHashMap<TopicPartition, OffsetAndError> responses = new LinkedHashMap<TopicPartition, OffsetAndError>();
        responses.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        responses.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond((AbstractResponse)this.produceResponse(responses));
        sender.runOnce();
        Assertions.assertFalse((boolean)outOfOrderResponse.isDone());
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertFalse((boolean)successfulResponse.isDone());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp1, 0L, Errors.NOT_LEADER_OR_FOLLOWER, -1));
        sender.runOnce();
        Assertions.assertFalse((boolean)successfulResponse.isDone());
        sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp1, 0L, Errors.NONE, -1));
        sender.runOnce();
        Assertions.assertTrue((boolean)successfulResponse.isDone());
        Assertions.assertEquals((int)1, (int)transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        String nodeId = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(nodeId).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request1.isDone());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()));
        ClientRequest firstClientRequest = this.client.requests().peek();
        ClientRequest secondClientRequest = (ClientRequest)this.client.requests().toArray()[1];
        this.client.respondToRequest(secondClientRequest, (AbstractResponse)this.produceResponse(this.tp0, 1000L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(firstClientRequest, (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        RecordMetadata unknownMetadata = (RecordMetadata)request1.get();
        Assertions.assertFalse((boolean)unknownMetadata.hasOffset());
        Assertions.assertEquals((long)-1L, (long)unknownMetadata.offset());
    }

    @Test
    public void testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testUnresolvedSeq", 60000, 100L, this.apiVersions);
        this.setupWithTransactionState(transactionManager);
        this.doInitTransactions(transactionManager, new ProducerIdAndEpoch(343434L, 0));
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        this.appendToAccumulator(this.tp0);
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1012L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertEquals((Object)OptionalLong.of(1012L), (Object)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testIdempotentUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        this.appendToAccumulator(this.tp0);
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((short)1, (short)transactionManager.producerIdAndEpoch().epoch);
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1012L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertEquals((Object)OptionalLong.of(1012L), (Object)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        this.sender.runOnce();
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.of(1), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((long)1011L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertEquals((Object)OptionalLong.of(1011L), (Object)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata request3 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)3L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertFalse((boolean)request3.isDone());
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse((boolean)request2.isDone());
        Assertions.assertFalse((boolean)request3.isDone());
        Assertions.assertEquals((int)2, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((short)1, (short)transactionManager.producerIdAndEpoch().epoch);
        this.sendIdempotentProducerResponse(2, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertFalse((boolean)request3.isDone());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)1011L, (long)((RecordMetadata)request2.get()).offset());
        Assertions.assertEquals((Object)OptionalLong.of(1011L), (Object)transactionManager.lastAckedOffset(this.tp0));
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1012L, 1010L);
        this.sender.runOnce();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)request3.isDone());
        Assertions.assertEquals((long)1012L, (long)((RecordMetadata)request3.get()).offset());
        Assertions.assertEquals((Object)OptionalLong.of(1012L), (Object)transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((long)0L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        this.sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((long)1000L, (long)((RecordMetadata)request1.get()).offset());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((Object)OptionalLong.of(1000L), (Object)transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((long)2L, (long)transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
        this.sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals((Object)OptionalInt.empty(), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse((boolean)request2.isDone());
    }

    void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
        this.sendIdempotentProducerResponse(expectedSequence, tp, responseError, responseOffset, -1L);
    }

    void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) {
        this.sendIdempotentProducerResponse(-1, expectedSequence, tp, responseError, responseOffset, logStartOffset);
    }

    void sendIdempotentProducerResponse(int expectedEpoch, int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset, long logStartOffset) {
        this.client.respond(body -> {
            ProduceRequest produceRequest = (ProduceRequest)body;
            Assertions.assertTrue((boolean)RequestTestUtils.hasIdempotentRecords(produceRequest));
            MemoryRecords records = SenderTest.partitionRecords(produceRequest).get(tp);
            Iterator batchIterator = records.batches().iterator();
            RecordBatch firstBatch = (RecordBatch)batchIterator.next();
            Assertions.assertFalse((boolean)batchIterator.hasNext());
            if (expectedEpoch > -1) {
                Assertions.assertEquals((short)((short)expectedEpoch), (short)firstBatch.producerEpoch());
            }
            Assertions.assertEquals((int)expectedSequence, (int)firstBatch.baseSequence());
            return true;
        }, (AbstractResponse)this.produceResponse(tp, responseOffset, responseError, 0, logStartOffset, null));
    }

    @Test
    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0);
        this.client.prepareResponse(body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest)body), (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)ClusterAuthorizationException.class);
        Assertions.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCancelInFlightRequestAfterFatalError() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        FutureRecordMetadata future2 = this.appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.client.respond(body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest)body), (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertFutureFailure((Future<?>)future1, (Class<? extends Exception>)ClusterAuthorizationException.class);
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future2, (Class<? extends Exception>)ClusterAuthorizationException.class);
        this.client.respond(body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest)body), (AbstractResponse)this.produceResponse(this.tp1, 0L, Errors.NONE, 0));
        this.sender.runOnce();
    }

    @Test
    public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0);
        this.client.prepareResponse(body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest)body), (AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)UnsupportedForMessageFormatException.class);
        Assertions.assertFalse((boolean)transactionManager.hasError());
    }

    @Test
    public void testUnsupportedVersionInProduceRequest() throws Exception {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0);
        this.client.prepareUnsupportedVersionResponse(body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest)body));
        this.sender.runOnce();
        this.assertFutureFailure((Future<?>)future, (Class<? extends Exception>)UnsupportedVersionException.class);
        Assertions.assertTrue((boolean)transactionManager.hasFatalError());
        this.assertSendFailure(UnsupportedVersionException.class);
    }

    @Test
    public void testSequenceNumberIncrement() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        this.client.prepareResponse(body -> {
            if (body instanceof ProduceRequest) {
                ProduceRequest request = (ProduceRequest)body;
                MemoryRecords records = SenderTest.partitionRecords(request).get(this.tp0);
                Iterator batchIterator = records.batches().iterator();
                Assertions.assertTrue((boolean)batchIterator.hasNext());
                RecordBatch batch = (RecordBatch)batchIterator.next();
                Assertions.assertFalse((boolean)batchIterator.hasNext());
                Assertions.assertEquals((int)0, (int)batch.baseSequence());
                Assertions.assertEquals((long)343434L, (long)batch.producerId());
                Assertions.assertEquals((int)0, (int)batch.producerEpoch());
                return true;
            }
            return false;
        }, (AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        sender.runOnce();
        sender.runOnce();
        Assertions.assertTrue((boolean)responseFuture.isDone());
        Assertions.assertEquals((Object)OptionalInt.of(0), (Object)transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testRetryWhenProducerIdChanges() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, (short)Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        sender.runOnce();
        sender.runOnce();
        String id = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
        this.client.disconnect(id);
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertFalse((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be false");
        sender.runOnce();
        sender.runOnce();
        this.prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount(), (String)"Expected requests to be retried after pid change");
        Assertions.assertFalse((boolean)responseFuture.isDone());
        Assertions.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedException {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        int maxRetries = 10;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata responseFuture = this.appendToAccumulator(this.tp0);
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
        sender.runOnce();
        sender.runOnce();
        Assertions.assertFalse((boolean)responseFuture.isDone());
        Assertions.assertEquals((int)1, (int)sender.inFlightBatches(this.tp0).size());
        Assertions.assertEquals((int)1, (int)transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testIdempotentSplitBatchAndSend() throws Exception {
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = this.createTransactionManager();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        this.setupWithTransactionState(txnManager);
        this.prepareAndReceiveInitProducerId(123456L, Errors.NONE);
        Assertions.assertTrue((boolean)txnManager.hasProducerId());
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    @Test
    public void testTransactionalSplitBatchAndSend() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager txnManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L, this.apiVersions);
        this.setupWithTransactionState(txnManager);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartition(tp);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
        this.sender.runOnce();
        this.testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp);
    }

    private void testSplitBatchAndSend(TransactionManager txnManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition tp) throws Exception {
        int maxRetries = 1;
        String topic = tp.topic();
        int deliveryTimeoutMs = 3000;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        CompressionRatioEstimator.setEstimation((String)topic, (CompressionType)CompressionType.GZIP, (float)0.2f);
        try (Metrics m = new Metrics();){
            this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.GZIP, 0, 0L, deliveryTimeoutMs, m, metricGrpName, (Time)this.time, new ApiVersions(), txnManager, new BufferPool(totalSize, this.batchSize, this.metrics, (Time)this.time, "producer-internal-metrics"));
            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 1000L, txnManager, new ApiVersions());
            MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
            this.client.prepareMetadataUpdate(metadataUpdate1);
            long nowMs = this.time.milliseconds();
            Cluster cluster = TestUtils.singletonCluster();
            FutureRecordMetadata f1 = this.accumulator.append((String)tp.topic(), (int)tp.partition(), (long)0L, (byte[])"key1".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L, (boolean)false, (long)nowMs, (Cluster)cluster).future;
            FutureRecordMetadata f2 = this.accumulator.append((String)tp.topic(), (int)tp.partition(), (long)0L, (byte[])"key2".getBytes(), (byte[])new byte[this.batchSize / 2], null, null, (long)1000L, (boolean)false, (long)nowMs, (Cluster)cluster).future;
            sender.runOnce();
            sender.runOnce();
            Assertions.assertEquals((long)2L, (long)txnManager.sequenceNumber(tp).longValue(), (String)"The next sequence should be 2");
            String id = this.client.requests().peek().destination();
            Assertions.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
            HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
            this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assertions.assertEquals((long)2L, (long)txnManager.sequenceNumber(tp).longValue(), (String)"The next sequence should be 2");
            Assertions.assertEquals((double)(CompressionType.GZIP.rate - 0.005f), (double)CompressionRatioEstimator.estimation((String)topic, (CompressionType)CompressionType.GZIP), (double)0.01);
            sender.runOnce();
            Assertions.assertEquals((long)2L, (long)txnManager.sequenceNumber(tp).longValue(), (String)"The next sequence number should be 2");
            Assertions.assertFalse((boolean)f1.isDone(), (String)"The future shouldn't have been done.");
            Assertions.assertFalse((boolean)f2.isDone(), (String)"The future shouldn't have been done.");
            id = this.client.requests().peek().destination();
            Assertions.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assertions.assertTrue((boolean)f1.isDone(), (String)"The future should have been done.");
            Assertions.assertEquals((long)2L, (long)txnManager.sequenceNumber(tp).longValue(), (String)"The next sequence number should still be 2");
            Assertions.assertEquals((Object)OptionalInt.of(0), (Object)txnManager.lastAckedSequence(tp), (String)"The last ack'd sequence number should be 0");
            Assertions.assertFalse((boolean)f2.isDone(), (String)"The future shouldn't have been done.");
            Assertions.assertEquals((long)0L, (long)((RecordMetadata)f1.get()).offset(), (String)"Offset of the first message should be 0");
            sender.runOnce();
            id = this.client.requests().peek().destination();
            Assertions.assertEquals((Object)ApiKeys.PRODUCE, (Object)this.client.requests().peek().requestBuilder().apiKey());
            node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
            Assertions.assertTrue((boolean)this.client.isReady(node, this.time.milliseconds()), (String)"Client ready status should be true");
            responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
            this.client.respond(this.produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()), (AbstractResponse)new ProduceResponse(responseMap));
            sender.runOnce();
            Assertions.assertTrue((boolean)f2.isDone(), (String)"The future should have been done.");
            Assertions.assertEquals((long)2L, (long)txnManager.sequenceNumber(tp).longValue(), (String)"The next sequence number should be 2");
            Assertions.assertEquals((Object)OptionalInt.of(1), (Object)txnManager.lastAckedSequence(tp), (String)"The last ack'd sequence number should be 1");
            Assertions.assertEquals((long)1L, (long)((RecordMetadata)f2.get()).offset(), (String)"Offset of the first message should be 1");
            Assertions.assertTrue((boolean)this.accumulator.getDeque(tp).isEmpty(), (String)"There should be no batch in the accumulator");
            Assertions.assertTrue(((Double)((KafkaMetric)m.metrics().get(senderMetrics.batchSplitRate)).metricValue() > 0.0 ? 1 : 0) != 0, (String)"There should be a split");
        }
    }

    @Test
    public void testNoDoubleDeallocation() throws Exception {
        long totalSize = 0x100000L;
        String metricGrpName = "producer-custom-metrics";
        MatchingBufferPool pool = new MatchingBufferPool(totalSize, this.batchSize, this.metrics, this.time, metricGrpName);
        this.setupWithTransactionState(null, false, pool);
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(5000L);
        Assertions.assertFalse((boolean)pool.allMatch());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertTrue((boolean)pool.allMatch(), (String)"The batch should have been de-allocated");
        Assertions.assertTrue((boolean)pool.allMatch());
        this.sender.runOnce();
        Assertions.assertTrue((boolean)pool.allMatch(), (String)"The batch should have been de-allocated");
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        FutureRecordMetadata request = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size(), (String)"Expect one in-flight batch in accumulator");
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        responseMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
        this.time.sleep(deliveryTimeoutMs);
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size(), (String)"Expect zero in-flight batch in accumulator");
        try {
            request.get();
            Assertions.fail((String)"The expired batch should throw a TimeoutException");
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
    }

    @Test
    public void testRecordErrorPropagatedToApplication() throws InterruptedException {
        int recordCount = 5;
        this.setup();
        HashMap<Integer, FutureRecordMetadata> futures = new HashMap<Integer, FutureRecordMetadata>(recordCount);
        for (int i = 0; i < recordCount; ++i) {
            futures.put(i, this.appendToAccumulator(this.tp0));
        }
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        OffsetAndError offsetAndError = new OffsetAndError(-1L, Errors.INVALID_RECORD, Arrays.asList(new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(0).setBatchIndexErrorMessage("0"), new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(2).setBatchIndexErrorMessage("2"), new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(3)));
        this.client.respond((AbstractResponse)this.produceResponse(Collections.singletonMap(this.tp0, offsetAndError)));
        this.sender.runOnce();
        for (Map.Entry futureEntry : futures.entrySet()) {
            FutureRecordMetadata future = (FutureRecordMetadata)futureEntry.getValue();
            Assertions.assertTrue((boolean)future.isDone());
            KafkaException exception = TestUtils.assertFutureThrows(future, KafkaException.class);
            Integer index = (Integer)futureEntry.getKey();
            if (index == 0 || index == 2) {
                Assertions.assertTrue((boolean)(exception instanceof InvalidRecordException));
                Assertions.assertEquals((Object)index.toString(), (Object)exception.getMessage());
                continue;
            }
            if (index == 3) {
                Assertions.assertTrue((boolean)(exception instanceof InvalidRecordException));
                Assertions.assertEquals((Object)Errors.INVALID_RECORD.message(), (Object)exception.getMessage());
                continue;
            }
            Assertions.assertEquals(KafkaException.class, ((Object)((Object)exception)).getClass());
        }
    }

    @Test
    public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(deliveryTimeoutMs / 2L);
        this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(deliveryTimeoutMs / 2L);
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0, 0L, null));
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotRetry() throws Exception {
        long deliverTimeoutMs = 1500L;
        this.setupWithTransactionState(null, false, null);
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.time.sleep(deliverTimeoutMs);
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.NOT_LEADER_OR_FOLLOWER, -1));
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
        long deliverTimeoutMs = 1500L;
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0);
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.MESSAGE_TOO_LARGE, -1));
        this.time.sleep(deliverTimeoutMs);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)request1.isDone());
        Assertions.assertTrue((boolean)request2.isDone());
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testResetNextBatchExpiry() throws Exception {
        this.client = (MockClient)Mockito.spy((Object)new MockClient((Time)this.time, (Metadata)this.metadata));
        this.setupWithTransactionState(null);
        this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        this.time.setCurrentTimeMs(this.time.milliseconds() + this.accumulator.getDeliveryTimeoutMs() + 1L);
        this.sender.runOnce();
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.client});
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).ready((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).newClientRequest(ArgumentMatchers.anyString(), (AbstractRequest.Builder)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyInt(), (RequestCompletionHandler)ArgumentMatchers.any());
        ((MockClient)inOrder.verify((Object)this.client, Mockito.atLeastOnce())).send((ClientRequest)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(ArgumentMatchers.eq((long)0L), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(ArgumentMatchers.eq((long)this.accumulator.getDeliveryTimeoutMs()), ArgumentMatchers.anyLong());
        ((MockClient)inOrder.verify((Object)this.client)).poll(AdditionalMatchers.geq((long)1L), ArgumentMatchers.anyLong());
    }

    @Test
    public void testExpiredBatchesInMultiplePartitions() throws Exception {
        long deliveryTimeoutMs = 1500L;
        this.setupWithTransactionState(null, true, null);
        FutureRecordMetadata request1 = this.appendToAccumulator(this.tp0, this.time.milliseconds(), "k1", "v1");
        FutureRecordMetadata request2 = this.appendToAccumulator(this.tp1, this.time.milliseconds(), "k2", "v2");
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.client.inFlightRequestCount());
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size(), (String)"Expect one in-flight batch in accumulator");
        HashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        responseMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond((AbstractResponse)new ProduceResponse(responseMap));
        this.time.sleep(deliveryTimeoutMs);
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size(), (String)"Expect zero in-flight batch in accumulator");
        ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, ((Future)request1)::get);
        Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, ((Future)request2)::get);
        Assertions.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTransactionalRequestsSentOnShutdown() {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            TransactionManager txnManager = new TransactionManager(this.logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, txnManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
            TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1);
            this.setupWithTransactionState(txnManager);
            this.doInitTransactions(txnManager, producerIdAndEpoch);
            txnManager.beginTransaction();
            txnManager.maybeAddPartition(tp);
            this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
            sender.runOnce();
            sender.initiateClose();
            txnManager.beginCommit();
            AssertEndTxnRequestMatcher endTxnMatcher = new AssertEndTxnRequestMatcher(TransactionResult.COMMIT);
            this.client.prepareResponse(endTxnMatcher, (AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0)));
            sender.run();
            Assertions.assertTrue((boolean)endTxnMatcher.matched, (String)"Response didn't match in test");
        }
        finally {
            m.close();
        }
    }

    @Test
    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
        try (Metrics m = new Metrics();){
            int lingerMs = 50;
            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
            TransactionManager txnManager = new TransactionManager(this.logContext, "txnId", 6000, 100L, this.apiVersions);
            this.setupWithTransactionState(txnManager, lingerMs);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, 1, senderMetrics, (Time)this.time, 5000, 50L, txnManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
            this.doInitTransactions(txnManager, producerIdAndEpoch);
            txnManager.beginTransaction();
            this.addPartitionToTxn(sender, txnManager, this.tp0);
            this.appendToAccumulator(this.tp0);
            this.appendToAccumulator(this.tp0);
            sender.runOnce();
            Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
            TransactionalRequestResult commitResult = txnManager.beginCommit();
            ProducerTestUtils.runUntil(sender, this.client::hasInFlightRequests);
            this.respondToProduce(this.tp0, Errors.NONE, 1L);
            ProducerTestUtils.runUntil(sender, () -> ((TransactionManager)txnManager).hasInFlightRequest());
            this.respondToEndTxn(Errors.NONE);
            ProducerTestUtils.runUntil(sender, () -> ((TransactionManager)txnManager).isReady());
            Assertions.assertTrue((boolean)commitResult.isSuccessful());
            commitResult.await();
            txnManager.beginTransaction();
            this.addPartitionToTxn(sender, txnManager, this.tp0);
            this.appendToAccumulator(this.tp0);
            this.appendToAccumulator(this.tp0);
            this.time.sleep(lingerMs - 1);
            sender.runOnce();
            Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
            Assertions.assertTrue((boolean)this.accumulator.hasUndrained());
            this.time.sleep(1L);
            ProducerTestUtils.runUntil(sender, this.client::hasInFlightRequests);
            Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
        }
    }

    @Test
    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
        try (Metrics m = new Metrics();){
            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
            TransactionManager txnManager = new TransactionManager(this.logContext, "txnId", 6000, 100L, this.apiVersions);
            this.setupWithTransactionState(txnManager);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, 1, senderMetrics, (Time)this.time, 5000, 50L, txnManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
            this.doInitTransactions(txnManager, producerIdAndEpoch);
            txnManager.beginTransaction();
            this.addPartitionToTxn(sender, txnManager, this.tp0);
            this.appendToAccumulator(this.tp0);
            ProducerTestUtils.runUntil(sender, () -> this.client.requests().size() == 1);
            Assertions.assertFalse((boolean)this.accumulator.hasUndrained());
            Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
            Assertions.assertTrue((boolean)txnManager.hasInflightBatches(this.tp0));
            this.appendToAccumulator(this.tp0);
            txnManager.beginCommit();
            ProducerTestUtils.runUntil(sender, () -> this.client.requests().size() == 2);
            Assertions.assertTrue((boolean)txnManager.isCompleting());
            Assertions.assertFalse((boolean)txnManager.hasInFlightRequest());
            Assertions.assertTrue((boolean)txnManager.hasInflightBatches(this.tp0));
            this.respondToProduce(this.tp0, Errors.NONE, 0L);
            this.respondToProduce(this.tp0, Errors.NONE, 1L);
            ProducerTestUtils.runUntil(sender, () -> ((TransactionManager)txnManager).hasInFlightRequest());
            this.respondToEndTxn(Errors.NONE);
            ProducerTestUtils.runUntil(sender, () -> ((TransactionManager)txnManager).isReady());
        }
    }

    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
        txnManager.maybeAddPartition(tp);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
        ProducerTestUtils.runUntil(sender, () -> txnManager.isPartitionAdded(tp));
        Assertions.assertFalse((boolean)txnManager.hasInFlightRequest());
    }

    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
        this.client.respond(request -> request instanceof ProduceRequest, (AbstractResponse)this.produceResponse(tp, offset, error, 0));
    }

    private void respondToEndTxn(Errors error) {
        this.client.respond(request -> request instanceof EndTxnRequest, (AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(error.code()).setThrottleTimeMs(0)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIncompleteTransactionAbortOnShutdown() {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            TransactionManager txnManager = new TransactionManager(this.logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, txnManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
            TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1);
            this.setupWithTransactionState(txnManager);
            this.doInitTransactions(txnManager, producerIdAndEpoch);
            txnManager.beginTransaction();
            txnManager.maybeAddPartition(tp);
            this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
            sender.runOnce();
            sender.initiateClose();
            AssertEndTxnRequestMatcher endTxnMatcher = new AssertEndTxnRequestMatcher(TransactionResult.ABORT);
            this.client.prepareResponse(endTxnMatcher, (AbstractResponse)new EndTxnResponse(new EndTxnResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0)));
            sender.run();
            Assertions.assertTrue((boolean)endTxnMatcher.matched, (String)"Response didn't match in test");
        }
        finally {
            m.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Timeout(value=10L)
    @Test
    public void testForceShutdownWithIncompleteTransaction() {
        int maxRetries = 1;
        Metrics m = new Metrics();
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
        try {
            TransactionManager txnManager = new TransactionManager(this.logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, false, 0x100000, -1, maxRetries, senderMetrics, (Time)this.time, 5000, 50L, txnManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
            TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1);
            this.setupWithTransactionState(txnManager);
            this.doInitTransactions(txnManager, producerIdAndEpoch);
            txnManager.beginTransaction();
            txnManager.maybeAddPartition(tp);
            this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
            sender.runOnce();
            TransactionalRequestResult commitResult = txnManager.beginCommit();
            sender.forceClose();
            sender.run();
            Assertions.assertThrows(KafkaException.class, () -> ((TransactionalRequestResult)commitResult).await(), (String)"The test expected to throw a KafkaException for forcefully closing the sender");
        }
        finally {
            m.close();
        }
    }

    @Test
    public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException, ExecutionException {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        TransactionManager txnManager = new TransactionManager(this.logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100L, this.apiVersions);
        this.setupWithTransactionState(txnManager, false, null);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        FutureRecordMetadata metadata = this.appendToAccumulator(this.tp0, this.time.milliseconds(), "key", "value");
        txnManager.beginAbort();
        this.sender.runOnce();
        TestUtils.assertFutureThrows(metadata, TransactionAbortedException.class);
    }

    @Test
    public void testDoNotPollWhenNoRequestSent() {
        this.client = (MockClient)Mockito.spy((Object)new MockClient((Time)this.time, (Metadata)this.metadata));
        TransactionManager txnManager = new TransactionManager(this.logContext, "testDoNotPollWhenNoRequestSent", 6000, 100L, this.apiVersions);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        this.setupWithTransactionState(txnManager);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        ((MockClient)Mockito.verify((Object)this.client, (VerificationMode)Mockito.times((int)2))).poll(ArgumentMatchers.eq((long)50L), ArgumentMatchers.anyLong());
    }

    @Test
    public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, 0);
        TransactionManager txnManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L, this.apiVersions);
        this.setupWithTransactionState(txnManager, false, null);
        this.doInitTransactions(txnManager, producerIdAndEpoch);
        txnManager.beginTransaction();
        txnManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse((AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        this.appendToAccumulator(this.tp0, this.time.milliseconds(), "key1", "value1");
        this.appendToAccumulator(this.tp0, this.time.milliseconds(), "key2", "value2");
        this.sender.runOnce();
        Assertions.assertEquals((int)1, (int)this.sender.inFlightBatches(this.tp0).size());
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, -1L, Errors.MESSAGE_TOO_LARGE, -1));
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond((AbstractResponse)this.produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals((int)0, (int)this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(2000L);
        this.sender.runOnce();
    }

    @Test
    public void testDefaultErrorMessage() throws Exception {
        this.verifyErrorMessage(this.produceResponse(this.tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message());
    }

    @Test
    public void testCustomErrorMessage() throws Exception {
        String errorMessage = "testCustomErrorMessage";
        this.verifyErrorMessage(this.produceResponse(this.tp0, 0L, Errors.INVALID_REQUEST, 0, -1L, errorMessage), errorMessage);
    }

    @Test
    public void testSenderShouldRetryWithBackoffOnRetriableError() {
        long producerId = 343434L;
        TransactionManager transactionManager = this.createTransactionManager();
        this.setupWithTransactionState(transactionManager);
        long start = this.time.milliseconds();
        this.prepareAndReceiveInitProducerId(343434L, (short)-1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        long request1 = this.time.milliseconds();
        Assertions.assertEquals((long)start, (long)request1);
        this.prepareAndReceiveInitProducerId(343434L, (short)-1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        long request2 = this.time.milliseconds();
        Assertions.assertEquals((long)50L, (long)(request2 - request1));
        this.prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals((long)50L, (long)(this.time.milliseconds() - request2));
    }

    private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond((AbstractResponse)response);
        this.sender.runOnce();
        this.sender.runOnce();
        ExecutionException e1 = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> SenderTest.lambda$verifyErrorMessage$18((Future)future));
        Assertions.assertEquals(InvalidRequestException.class, e1.getCause().getClass());
        Assertions.assertEquals((Object)expectedMessage, (Object)e1.getCause().getMessage());
    }

    private MockClient.RequestMatcher produceRequestMatcher(TopicPartition tp, ProducerIdAndEpoch producerIdAndEpoch, int sequence, boolean isTransactional) {
        return body -> {
            if (!(body instanceof ProduceRequest)) {
                return false;
            }
            ProduceRequest request = (ProduceRequest)body;
            Map<TopicPartition, MemoryRecords> recordsMap = SenderTest.partitionRecords(request);
            MemoryRecords records = recordsMap.get(tp);
            if (records == null) {
                return false;
            }
            List batches = TestUtils.toList(records.batches());
            if (batches.size() != 1) {
                return false;
            }
            MutableRecordBatch batch = (MutableRecordBatch)batches.get(0);
            return batch.baseOffset() == 0L && batch.baseSequence() == sequence && batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() == producerIdAndEpoch.epoch && batch.isTransactional() == isTransactional;
        };
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
        return this.appendToAccumulator(tp, this.time.milliseconds(), "key", "value");
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException {
        return this.accumulator.append((String)tp.topic(), (int)tp.partition(), (long)timestamp, (byte[])key.getBytes(), (byte[])value.getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L, (boolean)false, (long)this.time.milliseconds(), (Cluster)TestUtils.singletonCluster()).future;
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L, logStartOffset, Collections.emptyList(), errorMessage);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> responses) {
        ProduceResponseData data = new ProduceResponseData();
        for (Map.Entry<TopicPartition, OffsetAndError> entry : responses.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            ProduceResponseData.TopicProduceResponse topicData = data.responses().find(topicPartition.topic());
            if (topicData == null) {
                topicData = new ProduceResponseData.TopicProduceResponse().setName(topicPartition.topic());
                data.responses().add(topicData);
            }
            OffsetAndError offsetAndError = entry.getValue();
            ProduceResponseData.PartitionProduceResponse partitionData = new ProduceResponseData.PartitionProduceResponse().setIndex(topicPartition.partition()).setBaseOffset(offsetAndError.offset).setErrorCode(offsetAndError.error.code()).setRecordErrors(offsetAndError.recordErrors);
            topicData.partitionResponses().add(partitionData);
        }
        return new ProduceResponse(data);
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        return this.produceResponse(tp, offset, error, throttleTimeMs, -1L, null);
    }

    private TransactionManager createTransactionManager() {
        return new TransactionManager(new LogContext(), null, 0, 50L, new ApiVersions());
    }

    private void setupWithTransactionState(TransactionManager transactionManager) {
        this.setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, int lingerMs) {
        this.setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, lingerMs);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
        this.setupWithTransactionState(transactionManager, guaranteeOrder, customPool, true, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, boolean updateMetadata) {
        this.setupWithTransactionState(transactionManager, guaranteeOrder, customPool, updateMetadata, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool, boolean updateMetadata, int retries, int lingerMs) {
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        BufferPool pool = customPool == null ? new BufferPool(totalSize, this.batchSize, this.metrics, (Time)this.time, metricGrpName) : customPool;
        this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.NONE, lingerMs, 0L, 1500, this.metrics, metricGrpName, (Time)this.time, this.apiVersions, transactionManager, pool);
        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        this.sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, guaranteeOrder, 0x100000, -1, retries, this.senderMetricsRegistry, (Time)this.time, 5000, 50L, transactionManager, this.apiVersions);
        this.metadata.add("test", this.time.milliseconds());
        if (updateMetadata) {
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        }
    }

    private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
        FutureRecordMetadata future = this.appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assertions.fail((String)("Future should have raised " + expectedError.getSimpleName()));
        }
        catch (ExecutionException e) {
            Assertions.assertTrue((boolean)expectedError.isAssignableFrom(e.getCause().getClass()));
        }
    }

    private void prepareAndReceiveInitProducerId(long producerId, Errors error) {
        this.prepareAndReceiveInitProducerId(producerId, (short)0, error);
    }

    private void prepareAndReceiveInitProducerId(long producerId, short producerEpoch, Errors error) {
        if (error != Errors.NONE) {
            producerEpoch = (short)-1;
        }
        this.client.prepareResponse(body -> body instanceof InitProducerIdRequest && ((InitProducerIdRequest)body).data().transactionalId() == null, (AbstractResponse)this.initProducerIdResponse(producerId, producerEpoch, error));
        this.sender.runOnce();
    }

    private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(error.code()).setProducerEpoch(producerEpoch).setProducerId(producerId).setThrottleTimeMs(0);
        return new InitProducerIdResponse(responseData);
    }

    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        TransactionalRequestResult result = transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, transactionManager.transactionalId());
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.sender.runOnce();
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        result.await();
    }

    private void prepareFindCoordinatorResponse(Errors error, String txnid) {
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.client.prepareResponse((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)error, (String)txnid, (Node)node));
    }

    private void prepareInitProducerResponse(Errors error, long producerId, short producerEpoch) {
        this.client.prepareResponse((AbstractResponse)this.initProducerIdResponse(producerId, producerEpoch, error));
    }

    private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType) throws InterruptedException {
        Assertions.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assertions.fail((String)("Future should have raised " + expectedExceptionType.getName()));
        }
        catch (ExecutionException e) {
            Class<?> causeType = e.getCause().getClass();
            Assertions.assertTrue((boolean)expectedExceptionType.isAssignableFrom(causeType), (String)("Unexpected cause " + causeType.getName()));
        }
    }

    private void createMockClientWithMaxFlightOneMetadataPending() {
        this.client = new MockClient(this.time, (Metadata)this.metadata){
            volatile boolean canSendMore;
            {
                this.canSendMore = true;
            }

            @Override
            public Node leastLoadedNode(long now) {
                for (Node node : SenderTest.this.metadata.fetch().nodes()) {
                    if (!this.isReady(node, now) || !this.canSendMore) continue;
                    return node;
                }
                return null;
            }

            @Override
            public List<ClientResponse> poll(long timeoutMs, long now) {
                this.canSendMore = this.inFlightRequestCount() < 1;
                return super.poll(timeoutMs, now);
            }
        };
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), false);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        ClientRequest request = this.client.newClientRequest(node.idString(), (AbstractRequest.Builder<?>)builder, this.time.milliseconds(), true);
        while (!this.client.ready(node, this.time.milliseconds())) {
            this.client.poll(0L, this.time.milliseconds());
        }
        this.client.send(request, this.time.milliseconds());
        while (this.client.leastLoadedNode(this.time.milliseconds()) != null) {
            this.client.poll(0L, this.time.milliseconds());
        }
    }

    private void waitForProducerId(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        for (int i = 0; i < 5 && !transactionManager.hasProducerId(); ++i) {
            this.sender.runOnce();
        }
        Assertions.assertTrue((boolean)transactionManager.hasProducerId());
        Assertions.assertEquals((Object)producerIdAndEpoch, (Object)transactionManager.producerIdAndEpoch());
    }

    private static /* synthetic */ void lambda$verifyErrorMessage$18(Future future) throws Throwable {
        RecordMetadata cfr_ignored_0 = (RecordMetadata)future.get(5L, TimeUnit.SECONDS);
    }

    private static class OffsetAndError {
        final long offset;
        final Errors error;
        final List<ProduceResponseData.BatchIndexAndErrorMessage> recordErrors;

        OffsetAndError(long offset, Errors error, List<ProduceResponseData.BatchIndexAndErrorMessage> recordErrors) {
            this.offset = offset;
            this.error = error;
            this.recordErrors = recordErrors;
        }

        OffsetAndError(long offset, Errors error) {
            this(offset, error, Collections.emptyList());
        }
    }

    private class MatchingBufferPool
    extends BufferPool {
        IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;

        MatchingBufferPool(long totalSize, int batchSize, Metrics metrics, Time time, String metricGrpName) {
            super(totalSize, batchSize, metrics, time, metricGrpName);
            this.allocatedBuffers = new IdentityHashMap();
        }

        public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
            ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs);
            this.allocatedBuffers.put(buffer, Boolean.TRUE);
            return buffer;
        }

        public void deallocate(ByteBuffer buffer, int size) {
            if (!this.allocatedBuffers.containsKey(buffer)) {
                throw new IllegalStateException("Deallocating a buffer that is not allocated");
            }
            this.allocatedBuffers.remove(buffer);
            super.deallocate(buffer, size);
        }

        public boolean allMatch() {
            return this.allocatedBuffers.isEmpty();
        }
    }

    class AssertEndTxnRequestMatcher
    implements MockClient.RequestMatcher {
        private TransactionResult requiredResult;
        private boolean matched = false;

        AssertEndTxnRequestMatcher(TransactionResult requiredResult) {
            this.requiredResult = requiredResult;
        }

        @Override
        public boolean matches(AbstractRequest body) {
            if (body instanceof EndTxnRequest) {
                Assertions.assertSame((Object)this.requiredResult, (Object)((EndTxnRequest)body).result());
                this.matched = true;
                return true;
            }
            return false;
        }
    }
}

