/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final String CLIENT_ID = "clientId";
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long BATCH_EXPIRY_TIMEOUT = 1000L;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
    private ApiVersions apiVersions = new ApiVersions();
    private Cluster cluster = TestUtils.singletonCluster("test", 2);
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;

    @Before
    public void setup() {
        LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", CLIENT_ID);
        int batchSize = 16384;
        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
        this.brokerNode = new Node(0, "localhost", 2211);
        this.transactionManager = new TransactionManager("foobar", 1121, 100L);
        Metrics metrics = new Metrics(metricConfig, (Time)this.time);
        this.accumulator = new RecordAccumulator(batchSize, 0x100000L, CompressionType.NONE, 0L, 0L, metrics, (Time)this.time, this.apiVersions, this.transactionManager);
        this.sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, true, 0x100000, -1, Integer.MAX_VALUE, metrics, (Time)this.time, 1000, 50L, this.transactionManager, this.apiVersions, 1000L);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.setNode(this.brokerNode);
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.beginCommit();
        Assert.assertNull((Object)this.transactionManager.nextRequestHandler(true));
        Assert.assertTrue((boolean)this.transactionManager.nextRequestHandler(false).isEndTxn());
    }

    @Test(expected=IllegalStateException.class)
    public void testFailIfNotReadyForSendNoProducerId() {
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        TransactionManager idempotentTransactionManager = new TransactionManager();
        idempotentTransactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        TransactionManager idempotentTransactionManager = new TransactionManager();
        idempotentTransactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        idempotentTransactionManager.failIfNotReadyForSend();
    }

    @Test(expected=IllegalStateException.class)
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendAfterAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected=KafkaException.class)
    public void testFailIfNotReadyForSendAfterFatalError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.beginCommit();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.doInitTransactions(pid, epoch);
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(partition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.CONCURRENT_TRANSACTIONS);
        this.sender.run(this.time.milliseconds());
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)20L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.COORDINATOR_NOT_AVAILABLE);
        this.sender.run(this.time.milliseconds());
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        long pid = 13131L;
        short epoch = 1;
        TopicPartition partition = new TopicPartition("foo", 0);
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(partition);
        Assert.assertTrue((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(partition));
        Assert.assertTrue((boolean)this.transactionManager.isPartitionPendingAdd(partition));
        this.prepareAddPartitionsToTxn(partition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(partition));
        TopicPartition otherPartition = new TopicPartition("foo", 1);
        this.transactionManager.maybeAddPartitionToTransaction(otherPartition);
        this.prepareAddPartitionsToTxn(otherPartition, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler handler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull((Object)handler);
        Assert.assertEquals((long)100L, (long)handler.retryBackoffMs());
    }

    @Test(expected=IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected=KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterFatalError() {
        long pid = 13131L;
        short epoch = 1;
        this.doInitTransactions(pid, epoch);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToFatalError((RuntimeException)((Object)new KafkaException()));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test(expected=IllegalStateException.class)
    public void testInvalidSequenceIncrement() {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.incrementSequenceNumber(this.tp0, 3333);
    }

    @Test
    public void testDefaultSequenceNumber() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)3L);
    }

    @Test
    public void testProducerIdReset() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)3L);
        transactionManager.resetProducerId();
        Assert.assertEquals((long)transactionManager.sequenceNumber(this.tp0).intValue(), (long)0L);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        Assert.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assert.assertEquals(null, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertTrue((boolean)addOffsetsResult.isCompleted());
        this.transactionManager.beginCommit();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.isCompleting());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
                Assert.assertEquals((Object)findCoordinatorRequest.coordinatorType(), (Object)FindCoordinatorRequest.CoordinatorType.TRANSACTION);
                Assert.assertEquals((Object)findCoordinatorRequest.coordinatorKey(), (Object)"foobar");
                return true;
            }
        });
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
                Assert.assertEquals((Object)initProducerIdRequest.transactionalId(), (Object)"foobar");
                Assert.assertEquals((long)initProducerIdRequest.transactionTimeoutMs(), (long)1121L);
                return true;
            }
        });
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedVersionException));
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof UnsupportedForMessageFormatException));
        this.assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, true, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(null, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.blackout(this.brokerNode, 100L);
        this.sender.run(this.time.milliseconds());
        this.time.sleep(110L);
        Assert.assertEquals(null, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        long pid = 13131L;
        boolean epoch = true;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(null, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasProducerId());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse((boolean)initPidResult.isCompleted());
        this.prepareInitPidResponse(Errors.NONE, false, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
        Assert.assertEquals((long)13131L, (long)this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals((long)1L, (long)this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)initPidResult.isSuccessful());
        Assert.assertTrue((boolean)(initPidResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        long pid = 13131L;
        TransactionalRequestResult initPidResult = this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short)-1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)initPidResult.isCompleted());
        Assert.assertFalse((boolean)initPidResult.isSuccessful());
        Assert.assertTrue((boolean)(initPidResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        GroupAuthorizationException exception = (GroupAuthorizationException)sendOffsetsResult.error();
        Assert.assertEquals((Object)"consumer", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof GroupAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof GroupAuthorizationException));
        GroupAuthorizationException exception = (GroupAuthorizationException)sendOffsetsResult.error();
        Assert.assertEquals((Object)"consumer", (Object)exception.groupId());
        this.assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "consumer", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        String consumerGroupId = "consumer";
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsResult = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(tp, new OffsetAndMetadata(39L)), "consumer");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        this.prepareTxnOffsetCommitResponse("consumer", 13131L, (short)1, Collections.singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        Assert.assertTrue((boolean)sendOffsetsResult.isCompleted());
        Assert.assertFalse((boolean)sendOffsetsResult.isSuccessful());
        Assert.assertTrue((boolean)(sendOffsetsResult.error() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp0 = new TopicPartition("foo", 0);
        TopicPartition tp1 = new TopicPartition("bar", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(tp0);
        this.transactionManager.maybeAddPartitionToTransaction(tp1);
        HashMap<TopicPartition, Errors> errors = new HashMap<TopicPartition, Errors>();
        errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
        this.prepareAddPartitionsToTxn(errors);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TopicAuthorizationException));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionPendingAdd(tp1));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(tp1));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        TopicAuthorizationException exception = (TopicAuthorizationException)this.transactionManager.lastError();
        Assert.assertEquals(Collections.singleton(tp0.topic()), (Object)exception.unauthorizedTopics());
        this.assertAbortableError(TopicAuthorizationException.class);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        this.assertFutureFailed((Future<RecordMetadata>)responseFuture);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertNotNull(responseFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertFalse((boolean)unauthorizedTopicProduceFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        this.assertFutureFailed((Future<RecordMetadata>)authorizedTopicProduceFuture);
        this.assertFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)nextTransactionFuture.isDone());
        Assert.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition unauthorizedPartition = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata authorizedTopicProduceFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        this.accumulator.beginFlush();
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
        FutureRecordMetadata unauthorizedTopicProduceFuture = this.accumulator.append((TopicPartition)unauthorizedPartition, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isPartitionAdded(unauthorizedPartition));
        Assert.assertFalse((boolean)authorizedTopicProduceFuture.isDone());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.assertFutureFailed((Future<RecordMetadata>)unauthorizedTopicProduceFuture);
        Assert.assertTrue((boolean)authorizedTopicProduceFuture.isDone());
        Assert.assertNotNull(authorizedTopicProduceFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata nextTransactionFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)nextTransactionFuture.isDone());
        Assert.assertNotNull((Object)nextTransactionFuture.get());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        long pid = 13131L;
        boolean epoch = true;
        TopicPartition tp = new TopicPartition("foo", 0);
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(tp);
        this.prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertTrue((boolean)(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException));
        this.assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)commitResult.isCompleted());
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        Assert.assertFalse((boolean)commitResult.isCompleted());
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertTrue((boolean)this.transactionManager.isCompleting());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)commitResult.isCompleted());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata secondResponseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)secondResponseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)secondResponseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertTrue((boolean)secondResponseFuture.isDone());
    }

    @Test(expected=ExecutionException.class)
    public void testProducerFencedException() throws InterruptedException, ExecutionException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        responseFuture.get();
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)commitResult.isCompleted());
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)commitResult.isCompleted());
        try {
            commitResult.await();
            Assert.fail();
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof OutOfOrderSequenceException));
        }
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assert.assertTrue((boolean)this.transactionManager.isAborting());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        this.sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isAborting());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasInFlightRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.transactionManager.beginCommit();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertTrue((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sendProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertFalse((boolean)this.accumulator.hasUndrained());
        Assert.assertFalse((boolean)this.accumulator.hasIncomplete());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasInFlightRequest());
        this.sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.hasInFlightRequest());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof KafkaException));
        }
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        long producerId = 13131L;
        boolean producerEpoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected produce future to raise an exception");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof KafkaException));
        }
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        long producerId = 13131L;
        boolean producerEpoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short)1);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        RecordMetadata recordMetadata = (RecordMetadata)responseFuture.get();
        Assert.assertEquals((Object)this.tp0.topic(), (Object)recordMetadata.topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        TransactionalRequestResult addOffsetsResult = this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        this.prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        HashMap<TopicPartition, Errors> txnOffsetCommitResponse = new HashMap<TopicPartition, Errors>();
        txnOffsetCommitResponse.put(this.tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        Assert.assertEquals(null, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertNotNull((Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse((boolean)addOffsetsResult.isCompleted());
        txnOffsetCommitResponse.put(this.tp1, Errors.NONE);
        this.prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short)1, txnOffsetCommitResponse);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)addOffsetsResult.isCompleted());
        Assert.assertTrue((boolean)addOffsetsResult.isSuccessful());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        this.verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short)1, 13131L);
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myconsumergroup", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.isReady());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp1, new OffsetAndMetadata(1L));
        String consumerGroupId = "myconsumergroup";
        this.transactionManager.sendOffsetsToTransaction(offsets, "myconsumergroup");
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, "myconsumergroup", 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)abortResult.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertFalse((boolean)abortResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.hasFatalError());
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        PartitionInfo part2 = new PartitionInfo("test", 1, node2, null, null);
        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        nodes.add(node2);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node2.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node2.id())).isEmpty());
        Assert.assertFalse((boolean)this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        this.prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node1 = new Node(1, "localhost", 1112);
        PartitionInfo part1 = new PartitionInfo("test", 1, node1, null, null);
        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1), Collections.emptySet(), Collections.emptySet());
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Map drainedBatches = this.accumulator.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertEquals((long)1L, (long)((List)drainedBatches.get(node1.id())).size());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, 1000L);
        Node node1 = new Node(0, "localhost", 1111);
        PartitionInfo part1 = new PartitionInfo("test", 0, node1, null, null);
        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1), Collections.emptySet(), Collections.emptySet());
        HashSet<Node> nodes = new HashSet<Node>();
        nodes.add(node1);
        Map drainedBatches = this.accumulator.drain(cluster, nodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assert.assertTrue((boolean)drainedBatches.containsKey(node1.id()));
        Assert.assertTrue((boolean)((List)drainedBatches.get(node1.id())).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        this.prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.transactionManager.transitionToAbortableError((RuntimeException)((Object)new KafkaException()));
        this.prepareProduceResponse(Errors.NONE, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        Assert.assertNotNull(responseFuture.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException, ExecutionException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.cluster.nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException, ExecutionException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata firstBatchResponse = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        FutureRecordMetadata secondBatchResponse = this.accumulator.append((TopicPartition)this.tp1, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)firstBatchResponse.isDone());
        Assert.assertFalse((boolean)secondBatchResponse.isDone());
        HashMap<TopicPartition, Errors> partitionErrors = new HashMap<TopicPartition, Errors>();
        partitionErrors.put(this.tp0, Errors.NONE);
        partitionErrors.put(this.tp1, Errors.NONE);
        this.prepareAddPartitionsToTxn(partitionErrors);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertFalse((boolean)firstBatchResponse.isDone());
        Assert.assertFalse((boolean)secondBatchResponse.isDone());
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.cluster.nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)firstBatchResponse.isDone());
        Assert.assertTrue((boolean)secondBatchResponse.isDone());
        try {
            firstBatchResponse.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        try {
            secondBatchResponse.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException, ExecutionException {
        long pid = 13131L;
        boolean epoch = true;
        this.doInitTransactions(13131L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short)1, 13131L);
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue((boolean)this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse((boolean)responseFuture.isDone());
        TransactionalRequestResult commitResult = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node clusterNode = (Node)this.cluster.nodes().get(0);
        this.client.disconnect(clusterNode.idString());
        this.client.blackout(clusterNode, 100L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)responseFuture.isDone());
        try {
            responseFuture.get();
            Assert.fail((String)"Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TimeoutException));
        }
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)commitResult.isCompleted());
        Assert.assertFalse((boolean)commitResult.isSuccessful());
        Assert.assertTrue((boolean)this.transactionManager.hasAbortableError());
        Assert.assertTrue((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.isCompleting());
        Assert.assertTrue((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult abortResult = this.transactionManager.beginAbort();
        this.prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short)1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)abortResult.isCompleted());
        Assert.assertTrue((boolean)abortResult.isSuccessful());
        Assert.assertFalse((boolean)this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    private void verifyAddPartitionsFailsWithPartitionLevelError(Errors error) throws InterruptedException {
        long pid = 1L;
        boolean epoch = true;
        this.doInitTransactions(1L, (short)1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata responseFuture = this.accumulator.append((TopicPartition)this.tp0, (long)this.time.milliseconds(), (byte[])"key".getBytes(), (byte[])"value".getBytes(), (Header[])Record.EMPTY_HEADERS, null, (long)1000L).future;
        Assert.assertFalse((boolean)responseFuture.isDone());
        this.prepareAddPartitionsToTxn(this.tp0, error);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        Assert.assertFalse((boolean)this.transactionManager.transactionContainsPartition(this.tp0));
    }

    private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> errors) {
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest)body;
                Assert.assertEquals(new HashSet(request.partitions()), new HashSet(errors.keySet()));
                return true;
            }
        }, (AbstractResponse)new AddPartitionsToTxnResponse(0, errors));
    }

    private void prepareAddPartitionsToTxn(TopicPartition tp, Errors error) {
        this.prepareAddPartitionsToTxn(Collections.singletonMap(tp, error));
    }

    private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect, final FindCoordinatorRequest.CoordinatorType coordinatorType, final String coordinatorKey) {
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)body;
                Assert.assertEquals((Object)findCoordinatorRequest.coordinatorType(), (Object)coordinatorType);
                Assert.assertEquals((Object)findCoordinatorRequest.coordinatorKey(), (Object)coordinatorKey);
                return true;
            }
        }, (AbstractResponse)new FindCoordinatorResponse(error, this.brokerNode), shouldDisconnect);
    }

    private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid, short epoch) {
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest)body;
                Assert.assertEquals((Object)initProducerIdRequest.transactionalId(), (Object)"foobar");
                Assert.assertEquals((long)initProducerIdRequest.transactionTimeoutMs(), (long)1121L);
                return true;
            }
        }, (AbstractResponse)new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
    }

    private void sendProduceResponse(Errors error, long pid, short epoch) {
        this.client.respond(this.produceRequestMatcher(pid, epoch), (AbstractResponse)this.produceResponse(this.tp0, 0L, error, 0));
    }

    private void prepareProduceResponse(Errors error, long pid, short epoch) {
        this.client.prepareResponse(this.produceRequestMatcher(pid, epoch), (AbstractResponse)this.produceResponse(this.tp0, 0L, error, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ProduceRequest produceRequest = (ProduceRequest)body;
                MemoryRecords records = (MemoryRecords)produceRequest.partitionRecordsOrFail().get(TransactionManagerTest.this.tp0);
                Assert.assertNotNull((Object)records);
                Iterator batchIterator = records.batches().iterator();
                Assert.assertTrue((boolean)batchIterator.hasNext());
                MutableRecordBatch batch = (MutableRecordBatch)batchIterator.next();
                Assert.assertFalse((boolean)batchIterator.hasNext());
                Assert.assertTrue((boolean)batch.isTransactional());
                Assert.assertEquals((long)pid, (long)batch.producerId());
                Assert.assertEquals((long)epoch, (long)batch.producerEpoch());
                Assert.assertEquals((Object)"foobar", (Object)produceRequest.transactionalId());
                return true;
            }
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long pid) {
        this.client.prepareResponse(this.addPartitionsRequestMatcher(topicPartition, epoch, pid), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private void sendAddPartitionsToTxnResponse(Errors error, TopicPartition topicPartition, short epoch, long pid) {
        this.client.respond(this.addPartitionsRequestMatcher(topicPartition, epoch, pid), (AbstractResponse)new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, error)));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition, final short epoch, final long pid) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest)body;
                Assert.assertEquals((long)pid, (long)addPartitionsToTxnRequest.producerId());
                Assert.assertEquals((long)epoch, (long)addPartitionsToTxnRequest.producerEpoch());
                Assert.assertEquals(Collections.singletonList(topicPartition), (Object)addPartitionsToTxnRequest.partitions());
                Assert.assertEquals((Object)"foobar", (Object)addPartitionsToTxnRequest.transactionalId());
                return true;
            }
        };
    }

    private void prepareEndTxnResponse(Errors error, TransactionResult result, long pid, short epoch) {
        this.client.prepareResponse(this.endTxnMatcher(result, pid, epoch), (AbstractResponse)new EndTxnResponse(0, error));
    }

    private void sendEndTxnResponse(Errors error, TransactionResult result, long pid, short epoch) {
        this.client.respond(this.endTxnMatcher(result, pid, epoch), (AbstractResponse)new EndTxnResponse(0, error));
    }

    private MockClient.RequestMatcher endTxnMatcher(final TransactionResult result, final long pid, final short epoch) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                EndTxnRequest endTxnRequest = (EndTxnRequest)body;
                Assert.assertEquals((Object)"foobar", (Object)endTxnRequest.transactionalId());
                Assert.assertEquals((long)pid, (long)endTxnRequest.producerId());
                Assert.assertEquals((long)epoch, (long)endTxnRequest.producerEpoch());
                Assert.assertEquals((Object)result, (Object)endTxnRequest.command());
                return true;
            }
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId, final long producerId, final short producerEpoch) {
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)body;
                Assert.assertEquals((Object)consumerGroupId, (Object)addOffsetsToTxnRequest.consumerGroupId());
                Assert.assertEquals((Object)"foobar", (Object)addOffsetsToTxnRequest.transactionalId());
                Assert.assertEquals((long)producerId, (long)addOffsetsToTxnRequest.producerId());
                Assert.assertEquals((long)producerEpoch, (long)addOffsetsToTxnRequest.producerEpoch());
                return true;
            }
        }, (AbstractResponse)new AddOffsetsToTxnResponse(0, error));
    }

    private void prepareTxnOffsetCommitResponse(final String consumerGroupId, final long producerId, final short producerEpoch, Map<TopicPartition, Errors> txnOffsetCommitResponse) {
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest)body;
                Assert.assertEquals((Object)consumerGroupId, (Object)txnOffsetCommitRequest.consumerGroupId());
                Assert.assertEquals((long)producerId, (long)txnOffsetCommitRequest.producerId());
                Assert.assertEquals((long)producerEpoch, (long)txnOffsetCommitRequest.producerEpoch());
                return true;
            }
        }, (AbstractResponse)new TxnOffsetCommitResponse(0, txnOffsetCommitResponse));
    }

    private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, -1L);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        return new ProduceResponse(partResp, throttleTimeMs);
    }

    private void doInitTransactions(long pid, short epoch) {
        this.transactionManager.initializeTransactions();
        this.prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object)this.brokerNode, (Object)this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.prepareInitPidResponse(Errors.NONE, false, pid, epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((boolean)this.transactionManager.hasProducerId());
    }

    private void assertAbortableError(Class<? extends RuntimeException> cause) {
        try {
            this.transactionManager.beginTransaction();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assert.assertFalse((boolean)this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cause) {
        Assert.assertTrue((boolean)this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assert.fail((String)("Should have raised " + cause.getSimpleName()));
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)cause.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue((boolean)this.transactionManager.hasError());
        }
    }

    private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Expected produce future to throw");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }
}

