/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final String CLIENT_ID = "clientId";
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private ProducerMetadata metadata = new ProducerMetadata(0L, Long.MAX_VALUE, new LogContext(), new ClusterResourceListeners(), (Time)this.time);
    private MockClient client = new MockClient((Time)this.time, (Metadata)this.metadata);
    private ApiVersions apiVersions = new ApiVersions();
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;
    private final LogContext logContext = new LogContext();

    @Before
    public void setup() {
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", CLIENT_ID);
        int batchSize = 16384;
        int deliveryTimeoutMs = 3000;
        long totalSize = 0x100000L;
        String metricGrpName = "producer-metrics";
        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
        this.brokerNode = new Node(0, "localhost", 2211);
        this.transactionManager = new TransactionManager(this.logContext, "foobar", 1121, 100L);
        Metrics metrics = new Metrics(metricConfig, (Time)this.time);
        SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
        this.accumulator = new RecordAccumulator(this.logContext, batchSize, CompressionType.NONE, 0, 0L, deliveryTimeoutMs, metrics, metricGrpName, (Time)this.time, this.apiVersions, this.transactionManager, new BufferPool(totalSize, batchSize, metrics, (Time)this.time, metricGrpName));
        this.sender = new Sender(this.logContext, (KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, Integer.MAX_VALUE, senderMetrics, (Time)this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        this.metadata.add("test");
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
    }

    @Test
    public void testSenderShutdownWithPendingTransactions() throws Exception {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata sendFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.prepareProduceResponse(Errors.NONE, pid, epoch);
        this.sender.initiateClose();
        this.sender.runOnce();
        TransactionalRequestResult result = this.transactionManager.beginCommit();
        this.sender.runOnce();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
        this.sender.runOnce();
        Assert.assertTrue((boolean)result.isCompleted());
        this.sender.run();
        Assert.assertTrue((boolean)sendFuture.isDone());
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.beginCommit();
        Assert.assertNull((Object)this.transactionManager.nextRequestHandler(true));
        Assert.assertTrue((boolean)this.transactionManager.nextRequestHandler(false).isEndTxn());
    }

    @Test(expected=IllegalStateException.class)
    public void testFailIfNotReadyForSendNoProducerId() {
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        TransactionManager idempotentTransactionManager = new TransactionManager();
        idempotentTransactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        TransactionManager idempotentTransactionManager = new TransactionManager();
        idempotentTransactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        idempotentTransactionManager.failIfNotReadyForSend();
    }

    @Test(expected=IllegalStateException.class)
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendAfterAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendAfterFatalError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        this.transactionManager.beginCommit();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.CONCURRENT_TRANSACTIONS);
        this.sender.runOnce();
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)20L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.COORDINATOR_NOT_AVAILABLE);
        this.sender.runOnce();
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        TopicPartition otherPartition = new TopicPartition("foo", 1);
        this.transactionManager.maybeAddPartitionToTransaction(otherPartition);
        this.prepareAddPartitionsToTxn(otherPartition, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test(expected=IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterFatalError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.runOnce();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.runOnce();
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test
    public void testDefaultSequenceNumber() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)3L);
    }

    @Test
    public void testResetSequenceNumbersAfterUnknownProducerId() {
        long producerId = 13131L;
        boolean epoch = true;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, 1);
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "3");
        ProducerBatch b4 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "4");
        ProducerBatch b5 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "5");
        Assert.assertEquals((long)5L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        long b1AppendTime = this.time.milliseconds();
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        b1.done(500L, b1AppendTime, null);
        transactionManager.handleCompletedBatch(b1, b1Response);
        ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 600L);
        Assert.assertTrue((boolean)transactionManager.canRetry(b2Response, b2));
        Assert.assertEquals((long)4L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals((long)0L, (long)b2.baseSequence());
        Assert.assertEquals((long)1L, (long)b3.baseSequence());
        Assert.assertEquals((long)2L, (long)b4.baseSequence());
        Assert.assertEquals((long)3L, (long)b5.baseSequence());
    }

    @Test
    public void testAdjustSequenceNumbersAfterFatalError() {
        long producerId = 13131L;
        boolean epoch = true;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, 1);
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "3");
        ProducerBatch b4 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "4");
        ProducerBatch b5 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "5");
        Assert.assertEquals((long)5L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        long b1AppendTime = this.time.milliseconds();
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, b1AppendTime, 0L);
        b1.done(500L, b1AppendTime, null);
        transactionManager.handleCompletedBatch(b1, b1Response);
        ProduceResponse.PartitionResponse b2Response = new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE, -1L, -1L, 0L);
        Assert.assertFalse((boolean)transactionManager.canRetry(b2Response, b2));
        b2.done(-1L, -1L, (RuntimeException)Errors.MESSAGE_TOO_LARGE.exception());
        transactionManager.handleFailedBatch(b2, (RuntimeException)Errors.MESSAGE_TOO_LARGE.exception(), true);
        Assert.assertEquals((long)4L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals((long)1L, (long)b3.baseSequence());
        Assert.assertEquals((long)2L, (long)b4.baseSequence());
        Assert.assertEquals((long)3L, (long)b5.baseSequence());
        ProduceResponse.PartitionResponse b3Response = new ProduceResponse.PartitionResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L, -1L, 0L);
        Assert.assertTrue((boolean)transactionManager.canRetry(b3Response, b3));
        Assert.assertEquals((long)4L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals((long)1L, (long)b3.baseSequence());
        Assert.assertEquals((long)2L, (long)b4.baseSequence());
        Assert.assertEquals((long)3L, (long)b5.baseSequence());
    }

    @Test
    public void testBatchFailureAfterProducerReset() {
        long producerId = 13131L;
        boolean epoch = true;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, 1);
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "1");
        ProducerIdAndEpoch updatedProducerIdAndEpoch = new ProducerIdAndEpoch(13132L, 1);
        transactionManager.resetProducerId();
        transactionManager.setProducerIdAndEpoch(updatedProducerIdAndEpoch);
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "2");
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 400L);
        Assert.assertFalse((boolean)transactionManager.canRetry(b1Response, b1));
        transactionManager.handleFailedBatch(b1, (RuntimeException)Errors.UNKNOWN_PRODUCER_ID.exception(), true);
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals((Object)b2, (Object)transactionManager.nextBatchBySequence(this.tp0));
    }

    @Test
    public void testBatchCompletedAfterProducerReset() {
        long producerId = 13131L;
        boolean epoch = true;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, 1);
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "1");
        ProducerIdAndEpoch updatedProducerIdAndEpoch = new ProducerIdAndEpoch(13132L, 1);
        transactionManager.resetProducerId();
        transactionManager.setProducerIdAndEpoch(updatedProducerIdAndEpoch);
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(transactionManager, this.tp0, "2");
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L);
        transactionManager.handleCompletedBatch(b1, b1Response);
        Assert.assertEquals((long)1L, (long)transactionManager.sequenceNumber(this.tp0).intValue());
        Assert.assertEquals((Object)b2, (Object)transactionManager.nextBatchBySequence(this.tp0));
    }

    private ProducerBatch writeIdempotentBatchWithValue(TransactionManager manager, TopicPartition tp, String value) {
        int seq = manager.sequenceNumber(tp);
        manager.incrementSequenceNumber(tp, 1);
        ProducerBatch batch = this.batchWithValue(tp, value);
        batch.setProducerState(manager.producerIdAndEpoch(), seq, false);
        manager.addInFlightBatch(batch);
        batch.close();
        return batch;
    }

    private ProducerBatch batchWithValue(TopicPartition tp, String value) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(64), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        long currentTimeMs = this.time.milliseconds();
        ProducerBatch batch = new ProducerBatch(tp, builder, currentTimeMs);
        batch.tryAppend(currentTimeMs, new byte[0], value.getBytes(), new Header[0], null, currentTimeMs);
        return batch;
    }

    @Test
    public void testSequenceNumberOverflow() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
        transactionManager.incrementSequenceNumber(this.tp0, Integer.MAX_VALUE);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)Integer.MAX_VALUE);
        transactionManager.incrementSequenceNumber(this.tp0, 100);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)99L);
        transactionManager.incrementSequenceNumber(this.tp0, Integer.MAX_VALUE);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)98L);
    }

    @Test
    public void testProducerIdReset() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)3L);
        transactionManager.resetProducerId();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        Assert.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertTrue((boolean)addOffsetsResult.isCompleted());
        this.transactionManager.beginCommit();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.isCompleting());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(body -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
            Assert.assertEquals((Object)FindCoordinatorRequest.CoordinatorType.forId((byte)findCoordinatorRequest.data().keyType()), (Object)FindCoordinatorRequest.CoordinatorType.TRANSACTION);
            Assert.assertEquals((Object)findCoordinatorRequest.data().key(), (Object)"foobar");
            return true;
        });
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.prepareUnsupportedVersionResponse(body -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
            Assert.assertEquals((Object)initProducerIdRequest.data.transactionalId(), (Object)"foobar");
            Assert.assertEquals((long)initProducerIdRequest.data.transactionTimeoutMs(), (long)1121L);
            return true;
        });
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.runOnce();
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof UnsupportedForMessageFormatException));
        this.assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, true, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.blackout(this.brokerNode, 100L);
        this.sender.runOnce();
        this.time.sleep(110L);
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        this.sender.runOnce();
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)initPidResult.isSuccessful());
        Assert.assertTrue((boolean)(initPidResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        long pid = 13131L;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short)-1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)initPidResult.isSuccessful());
        Assert.assertTrue((boolean)(initPidResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        GroupAuthorizationException exception = (GroupAuthorizationException)sendOffsetsResult.error();
        Assert.assertEquals((Object)"consumer", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp1 = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp1, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.runOnce();
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp1, Errors.GROUP_AUTHORIZATION_FAILED));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        Assert.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        GroupAuthorizationException exception = (GroupAuthorizationException)sendOffsetsResult.error();
        Assert.assertEquals((Object)"consumer", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "consumer", 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.runOnce();
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp0 = new TopicPartition("foo", 0);
        TopicPartition tp1 = new TopicPartition("bar", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(tp0);
        this.transactionManager.maybeAddPartitionToTransaction(tp1);
        HashMap<TopicPartition, Errors> errors = new HashMap<TopicPartition, Errors>();
        errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
        this.prepareAddPartitionsToTxn(errors);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TopicAuthorizationException));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp1));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp1));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        TopicAuthorizationException exception = (TopicAuthorizationException)this.transactionManager.lastError();
        Assert.assertEquals(Collections.singleton(tp0.topic()), (Object)exception.unauthorizedTopics());
        this.assertAbortableError(TopicAuthorizationException.class);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        this.transactionManager.beginAbort();
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        this.assertFutureFailed((Future<RecordMetadata>)responseFuture);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertNotNull(responseFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertFalse((boolean)unauthorizedTopicProduceFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.transactionManager.beginAbort();
        this.sender.runOnce();
        this.assertFutureFailed((Future<RecordMetadata>)authorizedTopicProduceFuture);
        this.assertFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)nextTransactionFuture.isDone());
        Assert.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.accumulator.beginFlush();
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        this.assertFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assert.assertTrue((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertNotNull(authorizedTopicProduceFuture.get());
        Assert.assertTrue((boolean)authorizedTopicProduceFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.transactionManager.beginAbort();
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)nextTransactionFuture.isDone());
        Assert.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(tp);
        this.prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)commitResult.isCompleted());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        Assert.assertFalse((boolean)commitResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertTrue((boolean)this.transactionManager.isCompleting());
        this.sender.runOnce();
        Assert.assertTrue((boolean)commitResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata secondResponseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)secondResponseFuture.isDone());
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)secondResponseFuture.isDone());
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertTrue((boolean)secondResponseFuture.isDone());
    }

    @Test
    public void testProducerFencedException() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a ExecutionException from the response");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof ProducerFencedException));
        }
        Assert.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginTransaction());
        Assert.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginCommit());
        Assert.assertThrows(ProducerFencedException.class, () -> this.transactionManager.beginAbort());
        Assert.assertThrows(ProducerFencedException.class, () -> this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), "dummyId"));
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)commitResult.isCompleted());
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)commitResult.isCompleted());
        try {
            commitResult.await();
            Assert.fail();
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof OutOfOrderSequenceException));
        }
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        this.sender.runOnce();
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.isAborting());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        this.sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isAborting());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        this.transactionManager.beginCommit();
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.runOnce();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof KafkaException));
        }
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        long producerId = 13131L;
        boolean producerEpoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.runOnce();
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof KafkaException));
        }
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        long producerId = 13131L;
        boolean producerEpoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        RecordMetadata recordMetadata = (RecordMetadata)responseFuture.get();
        Assert.assertEquals((Object)this.tp0.topic(), (Object)recordMetadata.topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
        this.testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
    }

    @Test
    public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
        this.testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
    }

    private void testRetriableErrorInTxnOffsetCommit(Errors error) {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp0, new OffsetAndMetadata(1L));
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp0, Errors.NONE);
        txnOffsetCommitResponse.put(this.tp1, error);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        this.sender.runOnce();
        Assert.assertTrue((boolean)addOffsetsResult.isCompleted());
        Assert.assertTrue((boolean)addOffsetsResult.isSuccessful());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        this.verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short)1, 13131L);
        this.sender.runOnce();
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myconsumergroup", 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, "myconsumergroup", 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertFalse((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        PartitionInfo part2 = new PartitionInfo("test", 1, node2, null, null);
        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        nodes.add(node2);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node2.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node2.id())).isEmpty());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        this.prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 1, node1, null, null);
        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), Collections.emptySet(), Collections.emptySet());
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Map drainedBatches = this.accumulator.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertEquals((long)1L, (long)((List)drainedBatches.get(node1.id())).size());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Node node1 = new Node(0, "localhost", 1111);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertNotNull(responseFuture.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata firstBatchResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        FutureRecordMetadata secondBatchResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)firstBatchResponse.isDone());
        Assert.assertFalse((boolean)secondBatchResponse.isDone());
        HashMap<TopicPartition, Errors> partitionErrors = new HashMap<TopicPartition, Errors>();
        partitionErrors.put(this.tp0, Errors.NONE);
        partitionErrors.put(this.tp1, Errors.NONE);
        this.prepareAddPartitionsToTxn(partitionErrors);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertFalse((boolean)firstBatchResponse.isDone());
        Assert.assertFalse((boolean)secondBatchResponse.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)firstBatchResponse.isDone());
        Assert.assertTrue((boolean)secondBatchResponse.isDone());
        try {
            firstBatchResponse.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        try {
            secondBatchResponse.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        this.sender.runOnce();
        Assert.assertTrue((boolean)commitResult.isCompleted());
        Assert.assertFalse((boolean)commitResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.isCompleting());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short)1);
        this.sender.runOnce();
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.metadata.fetch().nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.runOnce();
        Assert.assertTrue((boolean)responseFuture.isDone());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertTrue((boolean)commitResult.isCompleted());
        Assert.assertFalse((boolean)commitResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testResetProducerIdAfterWithoutPendingInflightRequests() {
        TransactionManager manager = new TransactionManager(this.logContext, null, 1121, 100L);
        long producerId = 15L;
        short epoch = 5;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
        manager.setProducerIdAndEpoch(producerIdAndEpoch);
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        TopicPartition tp0 = new TopicPartition("foo", 0);
        Assert.assertEquals((Object)0, (Object)manager.sequenceNumber(tp0));
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(manager, tp0, "1");
        Assert.assertEquals((Object)1, (Object)manager.sequenceNumber(tp0));
        manager.handleCompletedBatch(b1, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assert.assertEquals((Object)OptionalInt.of(0), (Object)manager.lastAckedSequence(tp0));
        manager.markSequenceUnresolved(tp0);
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        Assert.assertFalse((boolean)manager.hasUnresolvedSequences());
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(manager, tp0, "2");
        Assert.assertEquals((Object)2, (Object)manager.sequenceNumber(tp0));
        manager.markSequenceUnresolved(tp0);
        manager.handleFailedBatch(b2, (RuntimeException)new TimeoutException(), false);
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.resetProducerIdIfNeeded();
        Assert.assertFalse((boolean)manager.hasUnresolvedSequences());
        Assert.assertFalse((boolean)manager.hasProducerId());
    }

    @Test
    public void testNoProducerIdResetAfterLastInFlightBatchSucceeds() {
        TransactionManager manager = new TransactionManager(this.logContext, null, 1121, 100L);
        long producerId = 15L;
        short epoch = 5;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
        manager.setProducerIdAndEpoch(producerIdAndEpoch);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(manager, tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(manager, tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(manager, tp0, "3");
        Assert.assertEquals((long)3L, (long)manager.sequenceNumber(tp0).intValue());
        manager.markSequenceUnresolved(tp0);
        manager.handleFailedBatch(b1, (RuntimeException)new TimeoutException(), false);
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.handleFailedBatch(b2, (RuntimeException)new TimeoutException(), false);
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.handleCompletedBatch(b3, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        Assert.assertFalse((boolean)manager.hasUnresolvedSequences());
        Assert.assertEquals((long)3L, (long)manager.sequenceNumber(tp0).intValue());
    }

    @Test
    public void testProducerIdResetAfterLastInFlightBatchFails() {
        TransactionManager manager = new TransactionManager(this.logContext, null, 1121, 100L);
        long producerId = 15L;
        short epoch = 5;
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId, epoch);
        manager.setProducerIdAndEpoch(producerIdAndEpoch);
        TopicPartition tp0 = new TopicPartition("foo", 0);
        ProducerBatch b1 = this.writeIdempotentBatchWithValue(manager, tp0, "1");
        ProducerBatch b2 = this.writeIdempotentBatchWithValue(manager, tp0, "2");
        ProducerBatch b3 = this.writeIdempotentBatchWithValue(manager, tp0, "3");
        Assert.assertEquals((Object)3, (Object)manager.sequenceNumber(tp0));
        manager.markSequenceUnresolved(tp0);
        manager.handleFailedBatch(b1, (RuntimeException)new TimeoutException(), false);
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.handleCompletedBatch(b2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        manager.resetProducerIdIfNeeded();
        Assert.assertEquals((Object)producerIdAndEpoch, (Object)manager.producerIdAndEpoch());
        Assert.assertTrue((boolean)manager.hasUnresolvedSequences());
        manager.handleFailedBatch(b3, (RuntimeException)new TimeoutException(), false);
        manager.resetProducerIdIfNeeded();
        Assert.assertFalse((boolean)manager.hasProducerId());
        Assert.assertFalse((boolean)manager.hasUnresolvedSequences());
        Assert.assertEquals((long)0L, (long)manager.sequenceNumber(tp0).intValue());
    }

    @Test
    public void testRetryAbortTransaction() throws InterruptedException {
        this.verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
    }

    @Test
    public void testRetryCommitTransaction() throws InterruptedException {
        this.verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
    }

    @Test(expected=KafkaException.class)
    public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException {
        this.verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.ABORT);
    }

    @Test(expected=KafkaException.class)
    public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException {
        this.verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
    }

    private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult, TransactionResult retryTransactionResult) throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.runOnce();
        this.sender.runOnce();
        TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, firstTransactionResult, 13131L, (short)1, true);
        this.sender.runOnce();
        Assert.assertFalse((boolean)result.isCompleted());
        try {
            result.await(1000L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Should have raised TimeoutException");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        Assert.assertEquals((Object)retryResult, (Object)result);
        this.prepareEndTxnResponse(Errors.NONE, retryTransactionResult, 13131L, (short)1, false);
        this.sender.runOnce();
        Assert.assertTrue((boolean)retryResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    private void verifyAddPartitionsFailsWithPartitionLevelError(Errors error) throws InterruptedException {
        long pid = 1L;
        boolean epoch = true;
        this.doInitTransactions(1L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxn(this.tp0, error);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    private void prepareAddPartitionsToTxn(Map<TopicPartition, Errors> errors) {
        this.client.prepareResponse(body -> {
            AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest)body;
            Assert.assertEquals(new HashSet(request.partitions()), new HashSet(errors.keySet()));
            return true;
        }, (AbstractResponse)new AddPartitionsToTxnResponse(0, errors));
    }

    private void prepareAddPartitionsToTxn(TopicPartition tp, Errors error) {
        this.prepareAddPartitionsToTxn(Collections.singletonMap(tp, error));
    }

    private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect, FindCoordinatorRequest.CoordinatorType coordinatorType, String coordinatorKey) {
        this.client.prepareResponse(body -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
            Assert.assertEquals((Object)FindCoordinatorRequest.CoordinatorType.forId((byte)findCoordinatorRequest.data().keyType()), (Object)coordinatorType);
            Assert.assertEquals((Object)findCoordinatorRequest.data().key(), (Object)coordinatorKey);
            return true;
        }, (AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)error, (Node)this.brokerNode), shouldDisconnect);
    }

    private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long producerId, short producerEpoch) {
        InitProducerIdResponseData responseData = new InitProducerIdResponseData().setErrorCode(error.code()).setProducerEpoch(producerEpoch).setProducerId(producerId).setThrottleTimeMs(0);
        this.client.prepareResponse(body -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
            Assert.assertEquals((Object)initProducerIdRequest.data.transactionalId(), (Object)"foobar");
            Assert.assertEquals((long)initProducerIdRequest.data.transactionTimeoutMs(), (long)1121L);
            return true;
        }, (AbstractResponse)new InitProducerIdResponse(responseData), shouldDisconnect);
    }

    private void sendProduceResponse(Errors error, long producerId, short producerEpoch) {
        this.client.respond(this.produceRequestMatcher(producerId, producerEpoch), (AbstractResponse)this.produceResponse(this.tp0, 0L, error, 0));
    }

    private void prepareProduceResponse(Errors error, long producerId, short producerEpoch) {
        this.client.prepareResponse(this.produceRequestMatcher(producerId, producerEpoch), (AbstractResponse)this.produceResponse(this.tp0, 0L, error, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(long pid, short epoch) {
        return body -> {
            ProduceRequest produceRequest = (ProduceRequest)body;
            MemoryRecords records = (MemoryRecords)produceRequest.partitionRecordsOrFail().get(this.tp0);
            Assert.assertNotNull((Object)records);
            Iterator batchIterator = records.batches().iterator();
            Assert.assertTrue((boolean)batchIterator.hasNext());
            MutableRecordBatch batch = (MutableRecordBatch)batchIterator.next();
            Assert.assertFalse((boolean)batchIterator.hasNext());
            Assert.assertTrue((boolean)batch.isTransactional());
            Assert.assertEquals((long)pid, (long)batch.producerId());
            Assert.assertEquals((long)epoch, (long)batch.producerEpoch());
            Assert.assertEquals((Object)"foobar", (Object)produceRequest.transactionalId());
            return true;
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long pid) {
        this.client.prepareResponse(this.addPartitionsRequestMatcher(topicPartition, epoch, pid), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private void sendAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long pid) {
        this.client.respond(this.addPartitionsRequestMatcher(topicPartition, epoch, pid), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(TopicPartition topicPartition, short epoch, long pid) {
        return body -> {
            AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest)body;
            Assert.assertEquals((long)pid, (long)addPartitionsToTxnRequest.producerId());
            Assert.assertEquals((long)epoch, (long)addPartitionsToTxnRequest.producerEpoch());
            Assert.assertEquals(Collections.singletonList(topicPartition), (Object)addPartitionsToTxnRequest.partitions());
            Assert.assertEquals((Object)"foobar", (Object)addPartitionsToTxnRequest.transactionalId());
            return true;
        };
    }

    private void prepareEndTxnResponse(Errors error, TransactionResult result, long pid, short epoch) {
        this.prepareEndTxnResponse(error, result, pid, epoch, false);
    }

    private void prepareEndTxnResponse(Errors error, TransactionResult result, long pid, short epoch, boolean shouldDisconnect) {
        this.client.prepareResponse(this.endTxnMatcher(result, pid, epoch), (AbstractResponse)new EndTxnResponse(0, error), shouldDisconnect);
    }

    private void sendEndTxnResponse(Errors error, TransactionResult result, long pid, short epoch) {
        this.client.respond(this.endTxnMatcher(result, pid, epoch), (AbstractResponse)new EndTxnResponse(0, error));
    }

    private MockClient.RequestMatcher endTxnMatcher(TransactionResult result, long pid, short epoch) {
        return body -> {
            EndTxnRequest endTxnRequest = (EndTxnRequest)body;
            Assert.assertEquals((Object)"foobar", (Object)endTxnRequest.transactionalId());
            Assert.assertEquals((long)pid, (long)endTxnRequest.producerId());
            Assert.assertEquals((long)epoch, (long)endTxnRequest.producerEpoch());
            Assert.assertEquals((Object)result, (Object)endTxnRequest.command());
            return true;
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors error, String consumerGroupId, long producerId, short producerEpoch) {
        this.client.prepareResponse(body -> {
            AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)body;
            Assert.assertEquals((Object)consumerGroupId, (Object)addOffsetsToTxnRequest.consumerGroupId());
            Assert.assertEquals((Object)"foobar", (Object)addOffsetsToTxnRequest.transactionalId());
            Assert.assertEquals((long)producerId, (long)addOffsetsToTxnRequest.producerId());
            Assert.assertEquals((long)producerEpoch, (long)addOffsetsToTxnRequest.producerEpoch());
            return true;
        }, (AbstractResponse)new AddOffsetsToTxnResponse(0, error));
    }

    private void prepareTxnOffsetCommitResponse(String consumerGroupId, long producerId, short producerEpoch, Map<TopicPartition, Errors> txnOffsetCommitResponse) {
        this.client.prepareResponse(request -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)request;
            Assert.assertEquals((Object)consumerGroupId, (Object)txnOffsetCommitRequest.consumerGroupId());
            Assert.assertEquals((long)producerId, (long)txnOffsetCommitRequest.producerId());
            Assert.assertEquals((long)producerEpoch, (long)txnOffsetCommitRequest.producerEpoch());
            return true;
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L, 10L);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private void doInitTransactions(long pid, short epoch) {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.runOnce();
        this.sender.runOnce();
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, false, pid, epoch);
        this.sender.runOnce();
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
    }

    private void assertAbortableError(Class<? extends RuntimeException> cause) {
        try {
            this.transactionManager.beginCommit();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assert.assertFalse((boolean)this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cause) {
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
    }

    private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Expected produce future to throw");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }
}

