package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/TransactionManagerTest.class */
public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final long producerId = 13131;
    private final short epoch = 1;
    private final String consumerGroupId = "myConsumerGroup";
    private final String memberId = "member";
    private final int generationId = 5;
    private final String groupInstanceId = "instance";
    private final LogContext logContext = new LogContext();
    private final MockTime time = new MockTime();
    private final ProducerMetadata metadata = new ProducerMetadata(0, 0, Long.MAX_VALUE, Long.MAX_VALUE, this.logContext, new ClusterResourceListeners(), this.time);
    private final MockClient client = new MockClient((Time) this.time, (Metadata) this.metadata);
    private final ApiVersions apiVersions = new ApiVersions();
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;

    @BeforeEach
    public void setup() {
        this.metadata.add("test", this.time.milliseconds());
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.brokerNode = new Node(0, "localhost", 2211);
        initializeTransactionManager(Optional.of("foobar"));
    }

    private void initializeTransactionManager(Optional<String> optional) {
        Metrics metrics = new Metrics(this.time);
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short) 0).setMaxVersion((short) 3), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short) 0).setMaxVersion((short) 7))));
        this.transactionManager = new TransactionManager(this.logContext, optional.orElse(null), 1121, 100L, this.apiVersions);
        this.brokerNode = new Node(0, "localhost", 2211);
        this.accumulator = new RecordAccumulator(this.logContext, 16384, CompressionType.NONE, 0, 0L, 0L, 3000, metrics, "producer-metrics", this.time, this.apiVersions, this.transactionManager, new BufferPool(1048576L, 16384, metrics, this.time, "producer-metrics"));
        this.sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
    }

    @Test
    public void testSenderShutdownWithPendingTransactions() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        this.sender.initiateClose();
        this.sender.runOnce();
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.transactionManager.beginCommit();
        Assertions.assertNull(this.transactionManager.nextRequest(true));
        Assertions.assertTrue(this.transactionManager.nextRequest(false).isEndTxn());
    }

    @Test
    public void testFailIfNotReadyForSendNoProducerId() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        initializeTransactionManager(Optional.empty());
        this.transactionManager.maybeAddPartition(this.tp0);
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        initializeTransactionManager(Optional.empty());
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        doInitTransactions();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testFailIfNotReadyForSendAfterAbortableError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testFailIfNotReadyForSendAfterFatalError() {
        doInitTransactions();
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions();
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(topicPartition);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasOngoingTransaction);
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.beginAbort();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions();
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.beginCommit();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions();
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions();
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        Assertions.assertTrue(this.transactionManager.hasPartitionsToAdd());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertTrue(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.CONCURRENT_TRANSACTIONS);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull(nextRequest);
        Assertions.assertEquals(20L, nextRequest.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull(nextRequest);
        Assertions.assertEquals(100L, nextRequest.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        Assertions.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(topicPartition));
        });
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        this.transactionManager.maybeAddPartition(topicPartition2);
        prepareAddPartitionsToTxn(topicPartition2, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull(nextRequest);
        Assertions.assertEquals(100L, nextRequest.retryBackoffMs());
    }

    @Test
    public void testNotReadyForSendBeforeInitTransactions() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testNotReadyForSendBeforeBeginTransaction() {
        doInitTransactions();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testNotReadyForSendAfterAbortableError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testNotReadyForSendAfterFatalError() {
        doInitTransactions();
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test
    public void testDefaultSequenceNumber() {
        initializeTransactionManager(Optional.empty());
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 0);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 3);
    }

    @Test
    public void testBumpEpochAndResetSequenceNumbersAfterUnknownProducerId() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "4");
        ProducerBatch writeIdempotentBatchWithValue5 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "5");
        Assertions.assertEquals(5, this.transactionManager.sequenceNumber(this.tp0));
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertEquals(2, writeIdempotentBatchWithValue2.producerEpoch());
        Assertions.assertEquals(0, writeIdempotentBatchWithValue2.baseSequence());
        Assertions.assertEquals(1, writeIdempotentBatchWithValue3.baseSequence());
        Assertions.assertEquals(2, writeIdempotentBatchWithValue4.baseSequence());
        Assertions.assertEquals(3, writeIdempotentBatchWithValue5.baseSequence());
    }

    @Test
    public void testBatchFailureAfterProducerReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, Short.MAX_VALUE);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "2");
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp1));
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 400L), writeIdempotentBatchWithValue));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, -1L, -1L, 400L));
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(writeIdempotentBatchWithValue3, this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp1));
        Assertions.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
    }

    @Test
    public void testBatchCompletedAfterProducerReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, Short.MAX_VALUE);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp0));
        this.transactionManager.requestEpochBumpForPartition(this.tp1);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        initializeIdempotentProducerId(13132L, (short) 0);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(0, this.transactionManager.lastAckedSequence(this.tp0).getAsInt());
        Assertions.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals(Short.MAX_VALUE, this.transactionManager.nextBatchBySequence(this.tp0).producerEpoch());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp0);
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertFalse(this.transactionManager.lastAckedSequence(this.tp0).isPresent());
        Assertions.assertNull(this.transactionManager.nextBatchBySequence(this.tp0));
    }

    @Test
    public void testDuplicateSequenceAfterProducerReset() throws Exception {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(13131L, (short) 1);
        Metrics metrics = new Metrics(this.time);
        RecordAccumulator recordAccumulator = new RecordAccumulator(this.logContext, 16384, CompressionType.NONE, 0, 0L, 0L, 15000, metrics, "", this.time, this.apiVersions, this.transactionManager, new BufferPool(1048576L, 16384, metrics, this.time, ""));
        Sender sender = new Sender(this.logContext, this.client, this.metadata, recordAccumulator, false, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, 10000, 0L, this.transactionManager, this.apiVersions);
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
        long milliseconds = this.time.milliseconds();
        FutureRecordMetadata futureRecordMetadata = recordAccumulator.append(this.tp0.topic(), this.tp0.partition(), milliseconds, "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, false, milliseconds, TestUtils.singletonCluster()).future;
        sender.runOnce();
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(10000L);
        sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertTrue(this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp0));
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertTrue(this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(5000L);
        sender.runOnce();
        Assertions.assertTrue(futureRecordMetadata.isDone());
        TestUtils.assertFutureThrows(futureRecordMetadata, TimeoutException.class);
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        sender.runOnce();
        Assertions.assertEquals(2, this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
        long milliseconds2 = this.time.milliseconds();
        FutureRecordMetadata futureRecordMetadata2 = recordAccumulator.append(this.tp0.topic(), this.tp0.partition(), milliseconds2, "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, false, milliseconds2, TestUtils.singletonCluster()).future;
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals(0, this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp0));
        this.time.sleep(5000L);
        sender.runOnce();
        Assertions.assertTrue(this.transactionManager.hasInflightBatches(this.tp0));
        Assertions.assertFalse(futureRecordMetadata2.isDone());
    }

    private ProducerBatch writeIdempotentBatchWithValue(TransactionManager transactionManager, TopicPartition topicPartition, String str) {
        transactionManager.maybeUpdateProducerIdAndEpoch(topicPartition);
        int sequenceNumber = transactionManager.sequenceNumber(topicPartition);
        transactionManager.incrementSequenceNumber(topicPartition, 1);
        ProducerBatch batchWithValue = batchWithValue(topicPartition, str);
        batchWithValue.setProducerState(transactionManager.producerIdAndEpoch(), sequenceNumber, false);
        transactionManager.addInFlightBatch(batchWithValue);
        batchWithValue.close();
        return batchWithValue;
    }

    private ProducerBatch batchWithValue(TopicPartition topicPartition, String str) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(64), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        long milliseconds = this.time.milliseconds();
        ProducerBatch producerBatch = new ProducerBatch(topicPartition, builder, milliseconds);
        producerBatch.tryAppend(milliseconds, new byte[0], str.getBytes(), new Header[0], (Callback) null, milliseconds);
        return producerBatch;
    }

    @Test
    public void testSequenceNumberOverflow() {
        initializeTransactionManager(Optional.empty());
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 0);
        this.transactionManager.incrementSequenceNumber(this.tp0, MAX_RETRIES);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), MAX_RETRIES);
        this.transactionManager.incrementSequenceNumber(this.tp0, 100);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 99);
        this.transactionManager.incrementSequenceNumber(this.tp0, MAX_RETRIES);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 98);
    }

    @Test
    public void testProducerIdReset() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(15L, Short.MAX_VALUE);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 0);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp1), 0);
        this.transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 3);
        this.transactionManager.incrementSequenceNumber(this.tp1, 3);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp1), 3);
        this.transactionManager.requestEpochBumpForPartition(this.tp0);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp0), 0);
        Assertions.assertEquals(this.transactionManager.sequenceNumber(this.tp1), 3);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        Assertions.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasPendingOffsetCommits);
        Assertions.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp1, Errors.NONE);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, hashMap2);
        Assertions.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        Assertions.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPendingOffsetCommits());
        });
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        this.transactionManager.beginCommit();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
        Assertions.assertFalse(this.transactionManager.isCompleting());
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testInitializeTransactionsTwiceRaisesError() {
        doInitTransactions(13131L, (short) 1);
        Assertions.assertTrue(this.transactionManager.hasProducerId());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.initializeTransactions();
        });
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
            Assertions.assertEquals(FindCoordinatorRequest.CoordinatorType.forId(findCoordinatorRequest.data().keyType()), FindCoordinatorRequest.CoordinatorType.TRANSACTION);
            Assertions.assertEquals(findCoordinatorRequest.data().key(), "foobar");
            return true;
        });
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasFatalError);
        Assertions.assertTrue(this.transactionManager.hasFatalError());
        Assertions.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertFalse(this.transactionManager.hasError());
        this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) abstractRequest;
            Assertions.assertEquals(initProducerIdRequest.data().transactionalId(), "foobar");
            Assertions.assertEquals(initProducerIdRequest.data().transactionTimeoutMs(), 1121);
            return true;
        });
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasFatalError);
        Assertions.assertTrue(this.transactionManager.hasFatalError());
        Assertions.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof UnsupportedForMessageFormatException);
        assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testFencedInstanceIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 5, "fenced_member", Optional.of("instance")));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assertions.assertEquals("myConsumerGroup", txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals(13131L, txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short) 1, txnOffsetCommitRequest.data().producerEpoch());
            return txnOffsetCommitRequest.data().groupInstanceId().equals("instance") && !txnOffsetCommitRequest.data().memberId().equals("member");
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.FENCED_INSTANCE_ID)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof FencedInstanceIdException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof FencedInstanceIdException);
        assertAbortableError(FencedInstanceIdException.class);
    }

    @Test
    public void testUnknownMemberIdInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 5, "unknownMember", Optional.empty()));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assertions.assertEquals("myConsumerGroup", txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals(13131L, txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short) 1, txnOffsetCommitRequest.data().producerEpoch());
            return !txnOffsetCommitRequest.data().memberId().equals("member");
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.UNKNOWN_MEMBER_ID)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof CommitFailedException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof CommitFailedException);
        assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testIllegalGenerationInTxnOffsetCommitByGroupMetadata() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup", 1, "", Optional.empty()));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assertions.assertEquals("myConsumerGroup", txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals(13131L, txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals((short) 1, txnOffsetCommitRequest.data().producerEpoch());
            return txnOffsetCommitRequest.data().generationId() != 5;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, Collections.singletonMap(topicPartition, Errors.ILLEGAL_GENERATION)));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof CommitFailedException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof CommitFailedException);
        assertAbortableError(CommitFailedException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, true, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        Assertions.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse(initializeTransactions.isCompleted());
        Assertions.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assertions.assertTrue(initializeTransactions.isCompleted());
        Assertions.assertTrue(this.transactionManager.hasProducerId());
        Assertions.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short) 1, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.backoff(this.brokerNode, 100L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        this.time.sleep(110L);
        Assertions.assertFalse(initializeTransactions.isCompleted());
        Assertions.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assertions.assertTrue(this.transactionManager.hasProducerId());
        Assertions.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short) 1, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        Assertions.assertFalse(initializeTransactions.isCompleted());
        Assertions.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        initializeTransactions.getClass();
        runUntil(initializeTransactions::isCompleted);
        Assertions.assertTrue(this.transactionManager.hasProducerId());
        Assertions.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short) 1, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.hasFatalError());
        Assertions.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assertions.assertFalse(initializeTransactions.isSuccessful());
        initializeTransactions.getClass();
        Assertions.assertThrows(TransactionalIdAuthorizationException.class, initializeTransactions::await);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short) -1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(initializeTransactions.isCompleted());
        Assertions.assertFalse(initializeTransactions.isSuccessful());
        initializeTransactions.getClass();
        Assertions.assertThrows(TransactionalIdAuthorizationException.class, initializeTransactions::await);
        assertAbortableError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        sendOffsetsToTransaction.getClass();
        runUntil(sendOffsetsToTransaction::isCompleted);
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assertions.assertEquals("myConsumerGroup", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.GROUP_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assertions.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        Assertions.assertEquals("myConsumerGroup", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "myConsumerGroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testInvalidTxnStateFailureInAddOffsetsToTxn() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.INVALID_TXN_STATE, "myConsumerGroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof InvalidTxnStateException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof InvalidTxnStateException);
        assertFatalError(InvalidTxnStateException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assertions.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() throws InterruptedException {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        this.transactionManager.maybeAddPartition(topicPartition2);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition2);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
        hashMap.put(topicPartition2, Errors.OPERATION_NOT_ATTEMPTED);
        prepareAddPartitionsToTxn(hashMap);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof TopicAuthorizationException);
        Assertions.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        Assertions.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition2));
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition2));
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertEquals(Collections.singleton(topicPartition.topic()), this.transactionManager.lastError().unauthorizedTopics());
        assertAbortableError(TopicAuthorizationException.class);
        this.sender.runOnce();
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
        TestUtils.assertFutureThrows(appendToAccumulator2, KafkaException.class);
    }

    @Test
    public void testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() throws InterruptedException {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        this.transactionManager.maybeAddPartition(topicPartition2);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition2);
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.sender.runOnce();
        Assertions.assertFalse(this.transactionManager.hasError());
        Assertions.assertFalse(beginCommit.isCompleted());
        Assertions.assertFalse(appendToAccumulator.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
        hashMap.put(topicPartition2, Errors.OPERATION_NOT_ATTEMPTED);
        this.client.respond(abstractRequest -> {
            Assertions.assertEquals(new HashSet(getPartitionsFromV3Request((AddPartitionsToTxnRequest) abstractRequest)), new HashSet(hashMap.keySet()));
            return true;
        }, (AbstractResponse) new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(AddPartitionsToTxnResponse.resultForTransaction("", hashMap).topicResults()).setThrottleTimeMs(0)));
        this.sender.runOnce();
        Assertions.assertTrue(this.transactionManager.hasError());
        Assertions.assertFalse(beginCommit.isCompleted());
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.sender.runOnce();
        Assertions.assertTrue(beginCommit.isCompleted());
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
        TestUtils.assertFutureThrows(appendToAccumulator2, KafkaException.class);
        Assertions.assertTrue(beginCommit.error() instanceof TopicAuthorizationException);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        assertProduceFutureFailed(appendToAccumulator);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::isReady);
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.accumulator.hasIncomplete());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        Assertions.assertNotNull(appendToAccumulator2.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
    }

    @Test
    public void testRetryAbortTransactionAfterTimeout() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assertions.assertThrows(TimeoutException.class, () -> {
            beginAbort.await(0L, TimeUnit.MILLISECONDS);
        });
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::isReady);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertFalse(beginAbort.isAcked());
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager2::initializeTransactions);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager3::beginTransaction);
        TransactionManager transactionManager4 = this.transactionManager;
        transactionManager4.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager4::beginCommit);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
        Assertions.assertSame(beginAbort, this.transactionManager.beginAbort());
        beginAbort.await();
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRetryCommitTransactionAfterTimeout() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assertions.assertThrows(TimeoutException.class, () -> {
            beginCommit.await(0L, TimeUnit.MILLISECONDS);
        });
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::isReady);
        Assertions.assertTrue(beginCommit.isSuccessful());
        Assertions.assertFalse(beginCommit.isAcked());
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager2::initializeTransactions);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager3::beginTransaction);
        TransactionManager transactionManager4 = this.transactionManager;
        transactionManager4.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager4::beginAbort);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
        Assertions.assertSame(beginCommit, this.transactionManager.beginCommit());
        beginCommit.await();
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRetryInitTransactionsAfterTimeout() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertThrows(TimeoutException.class, () -> {
            initializeTransactions.await(0L, TimeUnit.MILLISECONDS);
        });
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
        Assertions.assertTrue(initializeTransactions.isSuccessful());
        Assertions.assertFalse(initializeTransactions.isAcked());
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager2::beginTransaction);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager3::beginAbort);
        TransactionManager transactionManager4 = this.transactionManager;
        transactionManager4.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager4::beginCommit);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
        Assertions.assertSame(initializeTransactions, this.transactionManager.initializeTransactions());
        initializeTransactions.await();
        Assertions.assertTrue(initializeTransactions.isAcked());
        TransactionManager transactionManager5 = this.transactionManager;
        transactionManager5.getClass();
        Assertions.assertThrows(IllegalStateException.class, transactionManager5::initializeTransactions);
        this.transactionManager.beginTransaction();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(topicPartition);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.transactionManager.maybeAddPartition(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assertions.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        assertProduceFutureFailed(appendToAccumulator);
        assertProduceFutureFailed(appendToAccumulator2);
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.accumulator.hasIncomplete());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertNotNull(appendToAccumulator3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        runUntil(transactionManager3::isReady);
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        this.accumulator.beginFlush();
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertTrue(this.accumulator.hasIncomplete());
        this.transactionManager.maybeAddPartition(topicPartition);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(topicPartition);
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assertions.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assertions.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        assertProduceFutureFailed(appendToAccumulator2);
        Assertions.assertNotNull(appendToAccumulator.get());
        Assertions.assertTrue(appendToAccumulator.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assertions.assertTrue(this.transactionManager.isReady());
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assertions.assertFalse(this.accumulator.hasIncomplete());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertNotNull(appendToAccumulator3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager3 = this.transactionManager;
        transactionManager3.getClass();
        runUntil(transactionManager3::isReady);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        prepareAddPartitionsToTxn(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testInvalidTxnStateInAddPartitions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(topicPartition);
        prepareAddPartitionsToTxn(topicPartition, Errors.INVALID_TXN_STATE);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertTrue(this.transactionManager.lastError() instanceof InvalidTxnStateException);
        assertFatalError(InvalidTxnStateException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(beginCommit.isCompleted());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        Assertions.assertFalse(beginCommit.isCompleted());
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assertions.assertTrue(this.transactionManager.isCompleting());
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp1));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp1));
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertTrue(appendToAccumulator2.isDone());
    }

    @EnumSource(names = {"UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS"})
    @ParameterizedTest
    public void testRetriableErrors2(Errors errors) {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(errors, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(errors, false, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
        initializeTransactions.await();
        this.transactionManager.beginTransaction();
        Errors errors2 = errors.equals(Errors.CONCURRENT_TRANSACTIONS) ? Errors.COORDINATOR_LOAD_IN_PROGRESS : errors;
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(errors2, this.tp0, (short) 1, 13131L);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        prepareEndTxnResponse(errors, TransactionResult.COMMIT, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assertions.assertTrue(beginCommit.isSuccessful());
    }

    @Test
    public void testCoordinatorNotAvailable() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
        initializeTransactions.await();
    }

    @Test
    public void testProducerFencedExceptionInInitProducerId() {
        verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInInitProducerId() {
        verifyProducerFencedForInitProducerId(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForInitProducerId(Errors errors) {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(errors, false, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        initializeTransactions.getClass();
        Assertions.assertThrows(ProducerFencedException.class, initializeTransactions::await);
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginTransaction();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginCommit();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"));
        });
    }

    @Test
    public void testProducerFencedInAddPartitionToTxn() throws InterruptedException {
        verifyProducerFencedForAddPartitionsToTxn(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn() throws InterruptedException {
        verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForAddPartitionsToTxn(Errors errors) throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(errors, this.tp0, (short) 1, 13131L);
        verifyProducerFenced(appendToAccumulator);
    }

    @Test
    public void testProducerFencedInAddOffSetsToTxn() throws InterruptedException {
        verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInAddOffSetsToTxn() throws InterruptedException {
        verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
    }

    private void verifyProducerFencedForAddOffsetsToTxn(Errors errors) throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("myConsumerGroup"));
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddOffsetsToTxnResponse(errors, "myConsumerGroup", 13131L, (short) 1);
        verifyProducerFenced(appendToAccumulator);
    }

    private void verifyProducerFenced(Future<RecordMetadata> future) throws InterruptedException {
        future.getClass();
        runUntil(future::isDone);
        Assertions.assertTrue(this.transactionManager.hasError());
        try {
            future.get();
            Assertions.fail("Expected to get a ExecutionException from the response");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof ProducerFencedException);
        }
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginTransaction();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginCommit();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assertions.assertThrows(ProducerFencedException.class, () -> {
            this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"));
        });
    }

    @Test
    public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        beginCommit.getClass();
        Assertions.assertThrows(KafkaException.class, beginCommit::await);
        Assertions.assertFalse(beginCommit.isSuccessful());
        Assertions.assertTrue(beginCommit.isAcked());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.beginTransaction();
        });
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.beginCommit();
        });
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assertions.assertThrows(KafkaException.class, () -> {
            this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"));
        });
    }

    @Test
    public void testInvalidProducerEpochFromProduce() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short) 1);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.runOnce();
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertTrue(this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        TransactionManager.TxnRequestHandler nextRequest = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull(nextRequest);
        Assertions.assertTrue(nextRequest.requestBuilder() instanceof EndTxnRequest.Builder);
        TransactionManager.TxnRequestHandler nextRequest2 = this.transactionManager.nextRequest(false);
        Assertions.assertNotNull(nextRequest2);
        Assertions.assertTrue(nextRequest2.requestBuilder() instanceof InitProducerIdRequest.Builder);
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        beginCommit.getClass();
        Assertions.assertThrows(KafkaException.class, beginCommit::await);
        TestUtils.assertFutureThrows(appendToAccumulator, OutOfOrderSequenceException.class);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        Assertions.assertFalse(appendToAccumulator(this.tp0).isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assertions.assertTrue(this.transactionManager.isAborting());
        Assertions.assertFalse(this.transactionManager.hasError());
        sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertTrue(this.transactionManager.isAborting());
        Assertions.assertFalse(this.transactionManager.hasError());
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertTrue(this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        Assertions.assertTrue(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse(appendToAccumulator.isDone());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        runUntil(() -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() >= 4);
        });
        Assertions.assertFalse(this.accumulator.hasUndrained());
        Assertions.assertTrue(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse(appendToAccumulator.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertFalse(this.accumulator.hasUndrained());
        Assertions.assertFalse(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasPartitionsToAdd());
        });
        Assertions.assertTrue(this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        runUntil(() -> {
            return Boolean.valueOf(!this.accumulator.hasUndrained());
        });
        Assertions.assertFalse(this.accumulator.hasUndrained());
        Assertions.assertTrue(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        this.transactionManager.beginCommit();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        runUntil(() -> {
            return Boolean.valueOf(atomicInteger.incrementAndGet() >= 4);
        });
        Assertions.assertFalse(this.accumulator.hasUndrained());
        Assertions.assertTrue(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        Assertions.assertFalse(appendToAccumulator.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertFalse(this.accumulator.hasUndrained());
        Assertions.assertFalse(this.accumulator.hasIncomplete());
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        TransactionManager transactionManager2 = this.transactionManager;
        transactionManager2.getClass();
        runUntil(transactionManager2::isReady);
        Assertions.assertFalse(this.transactionManager.hasInFlightRequest());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        Assertions.assertFalse(appendToAccumulator(this.tp0).isDone());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasInFlightRequest);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) == null);
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
        TestUtils.assertFutureThrows(appendToAccumulator, KafkaException.class);
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
        Assertions.assertEquals(this.tp0.topic(), ((RecordMetadata) appendToAccumulator.get()).topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() {
        testRetriableErrorInTxnOffsetCommit(Errors.UNKNOWN_TOPIC_OR_PARTITION);
    }

    @Test
    public void testHandlingOfCoordinatorLoadingErrorOnTxnOffsetCommit() {
        testRetriableErrorInTxnOffsetCommit(Errors.COORDINATOR_LOAD_IN_PROGRESS);
    }

    private void testRetriableErrorInTxnOffsetCommit(Errors errors) {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, Errors.NONE);
        hashMap2.put(this.tp1, errors);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, hashMap2);
        Assertions.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP) != null);
        });
        Assertions.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasPendingOffsetCommits);
        Assertions.assertFalse(sendOffsetsToTransaction.isCompleted());
        hashMap2.put(this.tp1, Errors.NONE);
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, hashMap2);
        sendOffsetsToTransaction.getClass();
        runUntil(sendOffsetsToTransaction::isCompleted);
        Assertions.assertTrue(sendOffsetsToTransaction.isSuccessful());
    }

    @Test
    public void testHandlingOfProducerFencedErrorOnTxnOffsetCommit() {
        testFatalErrorInTxnOffsetCommit(Errors.PRODUCER_FENCED);
    }

    @Test
    public void testHandlingOfTransactionalIdAuthorizationFailedErrorOnTxnOffsetCommit() {
        testFatalErrorInTxnOffsetCommit(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
    }

    @Test
    public void testHandlingOfInvalidProducerEpochErrorOnTxnOffsetCommit() {
        testFatalErrorInTxnOffsetCommit(Errors.INVALID_PRODUCER_EPOCH, Errors.PRODUCER_FENCED);
    }

    @Test
    public void testHandlingOfUnsupportedForMessageFormatErrorOnTxnOffsetCommit() {
        testFatalErrorInTxnOffsetCommit(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT);
    }

    private void testFatalErrorInTxnOffsetCommit(Errors errors) {
        testFatalErrorInTxnOffsetCommit(errors, errors);
    }

    private void testFatalErrorInTxnOffsetCommit(Errors errors, Errors errors2) {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, Errors.NONE);
        hashMap2.put(this.tp1, errors);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, hashMap2);
        sendOffsetsToTransaction.getClass();
        runUntil(sendOffsetsToTransaction::isCompleted);
        Assertions.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assertions.assertEquals(errors2.exception().getClass(), sendOffsetsToTransaction.error().getClass());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        Assertions.assertFalse(appendToAccumulator(this.tp0).isDone());
        prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasError);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assertions.assertFalse(beginAbort.isCompleted());
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myConsumerGroup", 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(this.transactionManager.isReady());
        Assertions.assertTrue(beginAbort.isCompleted());
        Assertions.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, "myConsumerGroup", 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertFalse(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testSendOffsetsWithGroupMetadata() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        hashMap.put(this.tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        TransactionalRequestResult prepareGroupMetadataCommit = prepareGroupMetadataCommit(() -> {
            prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, "instance", "member", 5, hashMap);
        });
        this.sender.runOnce();
        Assertions.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        Assertions.assertFalse(prepareGroupMetadataCommit.isCompleted());
        hashMap.put(this.tp1, Errors.NONE);
        prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, "instance", "member", 5, hashMap);
        this.sender.runOnce();
        Assertions.assertTrue(prepareGroupMetadataCommit.isCompleted());
        Assertions.assertTrue(prepareGroupMetadataCommit.isSuccessful());
    }

    @Test
    public void testSendOffsetWithGroupMetadataFailAsAutoDowngradeTxnCommitNotEnabled() {
        this.client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.TXN_OFFSET_COMMIT.id, (short) 0, (short) 2));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        hashMap.put(this.tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        TransactionalRequestResult prepareGroupMetadataCommit = prepareGroupMetadataCommit(() -> {
            prepareTxnOffsetCommitResponse("myConsumerGroup", 13131L, (short) 1, hashMap);
        });
        this.sender.runOnce();
        Assertions.assertTrue(prepareGroupMetadataCommit.isCompleted());
        Assertions.assertFalse(prepareGroupMetadataCommit.isSuccessful());
        Assertions.assertTrue(prepareGroupMetadataCommit.error() instanceof UnsupportedVersionException);
        assertFatalError(UnsupportedVersionException.class);
    }

    private TransactionalRequestResult prepareGroupMetadataCommit(Runnable runnable) {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup", 5, "member", Optional.of("instance")));
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myConsumerGroup", 13131L, (short) 1);
        this.sender.runOnce();
        Assertions.assertFalse(sendOffsetsToTransaction.isCompleted());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myConsumerGroup");
        runnable.run();
        Assertions.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertNotNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assertions.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        return sendOffsetsToTransaction;
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        appendToAccumulator(this.tp0);
        this.transactionManager.maybeAddPartition(this.tp1);
        appendToAccumulator(this.tp1);
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        HashMap hashMap = new HashMap();
        hashMap.put(Integer.valueOf(node.id()), node);
        hashMap.put(Integer.valueOf(node2.id()), node2);
        MetadataSnapshot metadataSnapshot = new MetadataSnapshot((String) null, hashMap, Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp0, Optional.of(Integer.valueOf(node.id())), Optional.empty(), (List) null, (List) null, (List) null), new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(node2.id())), Optional.empty(), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        hashSet.add(node2);
        Map drain = this.accumulator.drain(metadataSnapshot, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assertions.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assertions.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
        Assertions.assertTrue(drain.containsKey(Integer.valueOf(node2.id())));
        Assertions.assertTrue(((List) drain.get(Integer.valueOf(node2.id()))).isEmpty());
        Assertions.assertFalse(this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp1);
        prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp1));
        });
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(1, "localhost", 1112);
        MetadataSnapshot metadataSnapshot = new MetadataSnapshot((String) null, Collections.singletonMap(Integer.valueOf(node.id()), node), Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp1, Optional.of(Integer.valueOf(node.id())), Optional.empty(), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        appendToAccumulator(this.tp1);
        Map drain = this.accumulator.drain(metadataSnapshot, Collections.singleton(node), MAX_RETRIES, this.time.milliseconds());
        Assertions.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assertions.assertEquals(1, ((List) drain.get(Integer.valueOf(node.id()))).size());
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        appendToAccumulator(this.tp0);
        Node node = new Node(0, "localhost", 1111);
        MetadataSnapshot metadataSnapshot = new MetadataSnapshot((String) null, Collections.singletonMap(Integer.valueOf(node.id()), node), Arrays.asList(new MetadataResponse.PartitionMetadata(Errors.NONE, this.tp0, Optional.of(Integer.valueOf(node.id())), Optional.empty(), (List) null, (List) null, (List) null)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), (Node) null, Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        Map drain = this.accumulator.drain(metadataSnapshot, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assertions.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assertions.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertNotNull(appendToAccumulator.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assertions.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof TimeoutException);
        }
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1);
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        hashMap.put(this.tp1, Errors.NONE);
        prepareAddPartitionsToTxn(hashMap);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertTrue(this.transactionManager.transactionContainsPartition(this.tp1));
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        try {
            appendToAccumulator.get();
            Assertions.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof TimeoutException);
        }
        try {
            appendToAccumulator2.get();
            Assertions.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e2) {
            Assertions.assertTrue(e2.getCause() instanceof TimeoutException);
        }
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        this.client.disconnect(((Node) this.metadata.fetch().nodes().get(0)).idString());
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assertions.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof TimeoutException);
        }
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assertions.assertFalse(beginCommit.isSuccessful());
        beginCommit.getClass();
        Assertions.assertThrows(TimeoutException.class, beginCommit::await);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assertions.assertFalse(this.transactionManager.isCompleting());
        Assertions.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException {
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short) 0).setMaxVersion((short) 1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short) 0).setMaxVersion((short) 7))));
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assertions.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assertions.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.transactionContainsPartition(this.tp0));
        });
        Assertions.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(appendToAccumulator.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 100L);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        try {
            appendToAccumulator.get();
            Assertions.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof TimeoutException);
        }
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        Assertions.assertFalse(beginCommit.isSuccessful());
        Assertions.assertTrue(this.transactionManager.hasFatalError());
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testBumpEpochAfterTimeoutWithoutPendingInflightRequests() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) 5);
        initializeIdempotentProducerId(15L, (short) 5);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(topicPartition));
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        Assertions.assertEquals(OptionalInt.of(0), this.transactionManager.lastAckedSequence(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.maybeResolveSequences();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assertions.assertFalse(this.transactionManager.hasUnresolvedSequences());
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue2);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue2, new TimeoutException(), false);
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.maybeResolveSequences();
        Assertions.assertFalse(this.transactionManager.hasUnresolvedSequences());
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 6);
        });
    }

    @Test
    public void testNoProducerIdResetAfterLastInFlightBatchSucceeds() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) 5);
        initializeIdempotentProducerId(15L, (short) 5);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "3");
        Assertions.assertEquals(3, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue, new TimeoutException(), false);
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue2, new TimeoutException(), false);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.maybeResolveSequences();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assertions.assertFalse(this.transactionManager.hasUnresolvedSequences());
        Assertions.assertEquals(3, this.transactionManager.sequenceNumber(topicPartition));
    }

    @Test
    public void testEpochBumpAfterLastInflightBatchFails() {
        initializeTransactionManager(Optional.empty());
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(13131L, (short) 1);
        initializeIdempotentProducerId(13131L, (short) 1);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "3");
        Assertions.assertEquals(3, this.transactionManager.sequenceNumber(topicPartition));
        this.transactionManager.markSequenceUnresolved(writeIdempotentBatchWithValue);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue, new TimeoutException(), false);
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue2, new ProduceResponse.PartitionResponse(Errors.NONE, 500L, this.time.milliseconds(), 0L));
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        Assertions.assertTrue(this.transactionManager.hasUnresolvedSequences());
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue3, new TimeoutException(), false);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertFalse(this.transactionManager.hasUnresolvedSequences());
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(topicPartition));
    }

    @Test
    public void testNoFailedBatchHandlingWhenTxnManagerIsInFatalError() {
        initializeTransactionManager(Optional.empty());
        initializeIdempotentProducerId(15L, (short) 5);
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "1"), new OutOfOrderSequenceException("out of sequence"), false);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(15L, (short) (5 + 1));
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.handleFailedBatch(writeIdempotentBatchWithValue(this.transactionManager, topicPartition, "2"), new TimeoutException(), true);
        this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
        Assertions.assertEquals(producerIdAndEpoch, this.transactionManager.producerIdAndEpoch());
    }

    @Test
    public void testAbortTransactionAndReuseSequenceNumberOnError() throws InterruptedException {
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short) 0).setMaxVersion((short) 1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short) 0).setMaxVersion((short) 7))));
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        Assertions.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testAbortTransactionAndResetSequenceNumberOnUnknownProducerId() throws InterruptedException {
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(Arrays.asList(new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.INIT_PRODUCER_ID.id).setMinVersion((short) 0).setMaxVersion((short) 1), new ApiVersionsResponseData.ApiVersion().setApiKey(ApiKeys.PRODUCE.id).setMinVersion((short) 0).setMaxVersion((short) 7))));
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp1);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp1);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1, this.tp1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        Assertions.assertTrue(this.transactionManager.isPartitionAdded(this.tp1));
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        Assertions.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        FutureRecordMetadata appendToAccumulator4 = appendToAccumulator(this.tp0);
        this.client.prepareResponse(produceRequestMatcher(13131L, (short) 1, this.tp0), (AbstractResponse) produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        appendToAccumulator4.getClass();
        runUntil(appendToAccumulator4::isDone);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        Assertions.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testBumpTransactionalEpochOnAbortableError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 13131L, (short) 1);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertTrue(beginAbort.isCompleted());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        Assertions.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnUnknownProducerIdError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        this.client.prepareResponse(produceRequestMatcher(13131L, (short) 1, this.tp0), (AbstractResponse) produceResponse(this.tp0, 0L, Errors.UNKNOWN_PRODUCER_ID, 0, 0));
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertTrue(beginAbort.isCompleted());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        Assertions.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnTimeout() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator2.getClass();
        runUntil(appendToAccumulator2::isDone);
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        MockClient mockClient = this.client;
        mockClient.getClass();
        runUntil(mockClient::hasInFlightRequests);
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 100L);
        appendToAccumulator3.getClass();
        runUntil(appendToAccumulator3::isDone);
        Assertions.assertTrue(this.transactionManager.hasAbortableError());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        this.sender.runOnce();
        this.time.sleep(110L);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertTrue(beginAbort.isCompleted());
        Assertions.assertTrue(beginAbort.isSuccessful());
        beginAbort.await();
        Assertions.assertTrue(this.transactionManager.isReady());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 2, 13131L);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp0));
        });
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp0));
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddPartitionRequestError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, this.tp0, (short) 1, 13131L);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertEquals((short) 2, this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testBumpTransactionalEpochOnRecoverableAddOffsetsRequestError() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        Assertions.assertFalse(appendToAccumulator.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        appendToAccumulator.getClass();
        runUntil(appendToAccumulator::isDone);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, new ConsumerGroupMetadata("myConsumerGroup"));
        Assertions.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, "myConsumerGroup", 13131L, (short) 1);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasAbortableError);
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 2);
        beginAbort.getClass();
        runUntil(beginAbort::isCompleted);
        Assertions.assertEquals((short) 2, this.transactionManager.producerIdAndEpoch().epoch);
        Assertions.assertTrue(beginAbort.isSuccessful());
        Assertions.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testHealthyPartitionRetriesDuringEpochBump() throws InterruptedException {
        initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(this.time)), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assertions.assertEquals(3, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp1));
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        ProduceResponse.PartitionResponse partitionResponse2 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue3.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, partitionResponse2);
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals(0, this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals(0, writeIdempotentBatchWithValue2.baseSequence());
        Assertions.assertTrue(writeIdempotentBatchWithValue2.sequenceHasBeenReset());
        Assertions.assertEquals(2, writeIdempotentBatchWithValue2.producerEpoch());
        Assertions.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
        Assertions.assertEquals(1, this.transactionManager.firstInFlightSequence(this.tp1));
        Assertions.assertEquals(1, writeIdempotentBatchWithValue4.baseSequence());
        Assertions.assertFalse(writeIdempotentBatchWithValue4.sequenceHasBeenReset());
        Assertions.assertEquals(1, writeIdempotentBatchWithValue4.producerEpoch());
        appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals(1, this.accumulator.getDeque(this.tp1).size());
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, 600L), writeIdempotentBatchWithValue4));
        this.accumulator.reenqueue(writeIdempotentBatchWithValue4, this.time.milliseconds());
        sender.runOnce();
        Assertions.assertEquals(1, this.accumulator.getDeque(this.tp1).size());
        Assertions.assertNotEquals(writeIdempotentBatchWithValue4, this.accumulator.getDeque(this.tp1).peek());
        Assertions.assertEquals((short) 1, writeIdempotentBatchWithValue4.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse3 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue4.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue4, partitionResponse3);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp1));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.hasInflightBatches(this.tp1));
        });
        Assertions.assertTrue(this.accumulator.getDeque(this.tp1).isEmpty());
        ProducerBatch nextBatchBySequence = this.transactionManager.nextBatchBySequence(this.tp1);
        Assertions.assertEquals(2, nextBatchBySequence.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse4 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        nextBatchBySequence.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(nextBatchBySequence, partitionResponse4);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testRetryAbortTransaction() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
    }

    @Test
    public void testRetryCommitTransaction() throws InterruptedException {
        verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
    }

    @Test
    public void testRetryAbortTransactionAfterCommitTimeout() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            verifyCommitOrAbortTransactionRetriable(TransactionResult.COMMIT, TransactionResult.ABORT);
        });
    }

    @Test
    public void testRetryCommitTransactionAfterAbortTimeout() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            verifyCommitOrAbortTransactionRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
        });
    }

    @Test
    public void testCanBumpEpochDuringCoordinatorDisconnect() {
        doInitTransactions(0L, (short) 0);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertTrue(this.transactionManager.canBumpEpoch());
        this.apiVersions.remove(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION).idString());
        Assertions.assertTrue(this.transactionManager.canBumpEpoch());
    }

    @Test
    public void testFailedInflightBatchAfterEpochBump() throws InterruptedException {
        initializeTransactionManager(Optional.empty());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(this.time)), this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        initializeIdempotentProducerId(13131L, (short) 1);
        ProducerBatch writeIdempotentBatchWithValue = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "1");
        ProducerBatch writeIdempotentBatchWithValue2 = writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "2");
        writeIdempotentBatchWithValue(this.transactionManager, this.tp0, "3");
        ProducerBatch writeIdempotentBatchWithValue3 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "4");
        ProducerBatch writeIdempotentBatchWithValue4 = writeIdempotentBatchWithValue(this.transactionManager, this.tp1, "5");
        Assertions.assertEquals(3, this.transactionManager.sequenceNumber(this.tp0));
        Assertions.assertEquals(2, this.transactionManager.sequenceNumber(this.tp1));
        long milliseconds = this.time.milliseconds();
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue, partitionResponse);
        ProduceResponse.PartitionResponse partitionResponse2 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue3.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue3, partitionResponse2);
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.UNKNOWN_PRODUCER_ID, -1L, -1L, 500L), writeIdempotentBatchWithValue2));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.producerIdAndEpoch().epoch == 2);
        });
        Assertions.assertEquals(writeIdempotentBatchWithValue2, this.transactionManager.nextBatchBySequence(this.tp0));
        Assertions.assertEquals(0, this.transactionManager.firstInFlightSequence(this.tp0));
        Assertions.assertEquals(0, writeIdempotentBatchWithValue2.baseSequence());
        Assertions.assertTrue(writeIdempotentBatchWithValue2.sequenceHasBeenReset());
        Assertions.assertEquals(2, writeIdempotentBatchWithValue2.producerEpoch());
        Assertions.assertEquals(writeIdempotentBatchWithValue4, this.transactionManager.nextBatchBySequence(this.tp1));
        Assertions.assertEquals(1, this.transactionManager.firstInFlightSequence(this.tp1));
        Assertions.assertEquals(1, writeIdempotentBatchWithValue4.baseSequence());
        Assertions.assertFalse(writeIdempotentBatchWithValue4.sequenceHasBeenReset());
        Assertions.assertEquals(1, writeIdempotentBatchWithValue4.producerEpoch());
        appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals(1, this.accumulator.getDeque(this.tp1).size());
        Assertions.assertTrue(this.transactionManager.canRetry(new ProduceResponse.PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, 600L), writeIdempotentBatchWithValue4));
        this.accumulator.reenqueue(writeIdempotentBatchWithValue4, this.time.milliseconds());
        sender.runOnce();
        Assertions.assertEquals(1, this.accumulator.getDeque(this.tp1).size());
        Assertions.assertNotEquals(writeIdempotentBatchWithValue4, this.accumulator.getDeque(this.tp1).peek());
        Assertions.assertEquals((short) 1, writeIdempotentBatchWithValue4.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse3 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        writeIdempotentBatchWithValue4.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(writeIdempotentBatchWithValue4, partitionResponse3);
        this.transactionManager.maybeUpdateProducerIdAndEpoch(this.tp1);
        Assertions.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals(0, this.transactionManager.sequenceNumber(this.tp1));
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.hasInflightBatches(this.tp1));
        });
        Assertions.assertTrue(this.accumulator.getDeque(this.tp1).isEmpty());
        ProducerBatch nextBatchBySequence = this.transactionManager.nextBatchBySequence(this.tp1);
        Assertions.assertEquals(2, nextBatchBySequence.producerEpoch());
        ProduceResponse.PartitionResponse partitionResponse4 = new ProduceResponse.PartitionResponse(Errors.NONE, 500L, milliseconds, 0L);
        nextBatchBySequence.complete(500L, milliseconds);
        this.transactionManager.handleCompletedBatch(nextBatchBySequence, partitionResponse4);
        Assertions.assertFalse(this.transactionManager.hasInflightBatches(this.tp1));
        Assertions.assertEquals(1, this.transactionManager.sequenceNumber(this.tp1));
    }

    @Test
    public void testBackgroundInvalidStateTransitionIsFatal() {
        doInitTransactions();
        Assertions.assertTrue(this.transactionManager.isTransactional());
        this.transactionManager.setPoisonStateOnInvalidTransition(true);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.handleFailedBatch(batchWithValue(this.tp0, "test"), new KafkaException(), false);
        });
        Assertions.assertTrue(this.transactionManager.hasFatalError());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.beginTransaction();
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.beginCommit();
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.maybeAddPartition(this.tp0);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.initializeTransactions();
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata("fake-group-id"));
        });
    }

    @Test
    public void testForegroundInvalidStateTransitionIsRecoverable() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.transactionManager.beginAbort();
        });
        Assertions.assertFalse(this.transactionManager.hasFatalError());
        doInitTransactions();
        Assertions.assertTrue(this.transactionManager.isTransactional());
        this.transactionManager.beginTransaction();
        Assertions.assertFalse(this.transactionManager.hasFatalError());
        this.transactionManager.maybeAddPartition(this.tp1);
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.isPartitionAdded(this.tp1));
        });
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assertions.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.transactionManager.hasOngoingTransaction());
        });
        beginCommit.getClass();
        runUntil(beginCommit::isCompleted);
        beginCommit.await();
        beginCommit.getClass();
        runUntil(beginCommit::isAcked);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition topicPartition) throws InterruptedException {
        long milliseconds = this.time.milliseconds();
        return this.accumulator.append(topicPartition.topic(), topicPartition.partition(), milliseconds, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, false, milliseconds, TestUtils.singletonCluster()).future;
    }

    private void verifyCommitOrAbortTransactionRetriable(TransactionResult transactionResult, TransactionResult transactionResult2) throws InterruptedException {
        doInitTransactions();
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartition(this.tp0);
        appendToAccumulator(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginCommit = transactionResult == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, transactionResult, 13131L, (short) 1, true);
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        Assertions.assertFalse(beginCommit.isCompleted());
        Assertions.assertThrows(TimeoutException.class, () -> {
            beginCommit.await(1000L, TimeUnit.MILLISECONDS);
        });
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(!this.client.hasPendingResponses());
        });
        TransactionalRequestResult beginCommit2 = transactionResult2 == TransactionResult.COMMIT ? this.transactionManager.beginCommit() : this.transactionManager.beginAbort();
        Assertions.assertEquals(beginCommit2, beginCommit);
        prepareEndTxnResponse(Errors.NONE, transactionResult2, 13131L, (short) 1, false);
        beginCommit2.getClass();
        runUntil(beginCommit2::isCompleted);
        Assertions.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    private void prepareAddPartitionsToTxn(Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            Assertions.assertEquals(new HashSet(getPartitionsFromV3Request((AddPartitionsToTxnRequest) abstractRequest)), new HashSet(map.keySet()));
            return true;
        }, (AbstractResponse) new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(AddPartitionsToTxnResponse.resultForTransaction("", map).topicResults()).setThrottleTimeMs(0)));
    }

    private void prepareAddPartitionsToTxn(TopicPartition topicPartition, Errors errors) {
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, errors));
    }

    private void prepareFindCoordinatorResponse(Errors errors, boolean z, FindCoordinatorRequest.CoordinatorType coordinatorType, String str) {
        this.client.prepareResponse(abstractRequest -> {
            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
            Assertions.assertEquals(coordinatorType, FindCoordinatorRequest.CoordinatorType.forId(findCoordinatorRequest.data().keyType()));
            Assertions.assertEquals(str, findCoordinatorRequest.data().coordinatorKeys().isEmpty() ? findCoordinatorRequest.data().key() : (String) findCoordinatorRequest.data().coordinatorKeys().get(0));
            return true;
        }, FindCoordinatorResponse.prepareResponse(errors, str, this.brokerNode), z);
    }

    private void prepareInitPidResponse(Errors errors, boolean z, long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) abstractRequest;
            Assertions.assertEquals("foobar", initProducerIdRequest.data().transactionalId());
            Assertions.assertEquals(1121, initProducerIdRequest.data().transactionTimeoutMs());
            return true;
        }, new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(errors.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0)), z);
    }

    private void sendProduceResponse(Errors errors, long j, short s) {
        sendProduceResponse(errors, j, s, this.tp0);
    }

    private void sendProduceResponse(Errors errors, long j, short s, TopicPartition topicPartition) {
        this.client.respond(produceRequestMatcher(j, s, topicPartition), (AbstractResponse) produceResponse(topicPartition, 0L, errors, 0));
    }

    private void prepareProduceResponse(Errors errors, long j, short s) {
        prepareProduceResponse(errors, j, s, this.tp0);
    }

    private void prepareProduceResponse(Errors errors, long j, short s, TopicPartition topicPartition) {
        this.client.prepareResponse(produceRequestMatcher(j, s, topicPartition), (AbstractResponse) produceResponse(topicPartition, 0L, errors, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(long j, short s, TopicPartition topicPartition) {
        return abstractRequest -> {
            ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
            MemoryRecords memoryRecords = (MemoryRecords) ((ProduceRequestData.TopicProduceData) produceRequest.data().topicData().stream().filter(topicProduceData -> {
                return topicProduceData.name().equals(topicPartition.topic());
            }).findAny().get()).partitionData().stream().filter(partitionProduceData -> {
                return partitionProduceData.index() == topicPartition.partition();
            }).map(partitionProduceData2 -> {
                return partitionProduceData2.records();
            }).findAny().get();
            Assertions.assertNotNull(memoryRecords);
            Iterator it = memoryRecords.batches().iterator();
            Assertions.assertTrue(it.hasNext());
            MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) it.next();
            Assertions.assertFalse(it.hasNext());
            Assertions.assertTrue(mutableRecordBatch.isTransactional());
            Assertions.assertEquals(j, mutableRecordBatch.producerId());
            Assertions.assertEquals(s, mutableRecordBatch.producerEpoch());
            Assertions.assertEquals("foobar", produceRequest.transactionalId());
            return true;
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.prepareResponse(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(0).setResultsByTopicV3AndBelow(AddPartitionsToTxnResponse.resultForTransaction("", Collections.singletonMap(topicPartition, errors)).topicResults())));
    }

    private void sendAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.respond(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(0).setResultsByTopicV3AndBelow(AddPartitionsToTxnResponse.resultForTransaction("", Collections.singletonMap(topicPartition, errors)).topicResults())));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(TopicPartition topicPartition, short s, long j) {
        return abstractRequest -> {
            AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) abstractRequest;
            Assertions.assertEquals(j, addPartitionsToTxnRequest.data().v3AndBelowProducerId());
            Assertions.assertEquals(s, addPartitionsToTxnRequest.data().v3AndBelowProducerEpoch());
            Assertions.assertEquals(Collections.singletonList(topicPartition), getPartitionsFromV3Request(addPartitionsToTxnRequest));
            Assertions.assertEquals("foobar", addPartitionsToTxnRequest.data().v3AndBelowTransactionalId());
            return true;
        };
    }

    private List<TopicPartition> getPartitionsFromV3Request(AddPartitionsToTxnRequest addPartitionsToTxnRequest) {
        return AddPartitionsToTxnRequest.getPartitions(addPartitionsToTxnRequest.data().v3AndBelowTopics());
    }

    private void prepareEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        prepareEndTxnResponse(errors, transactionResult, j, s, false);
    }

    private void prepareEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s, boolean z) {
        this.client.prepareResponse(endTxnMatcher(transactionResult, j, s), new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0)), z);
    }

    private void sendEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        this.client.respond(endTxnMatcher(transactionResult, j, s), (AbstractResponse) new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0)));
    }

    private MockClient.RequestMatcher endTxnMatcher(TransactionResult transactionResult, long j, short s) {
        return abstractRequest -> {
            EndTxnRequest endTxnRequest = (EndTxnRequest) abstractRequest;
            Assertions.assertEquals("foobar", endTxnRequest.data().transactionalId());
            Assertions.assertEquals(j, endTxnRequest.data().producerId());
            Assertions.assertEquals(s, endTxnRequest.data().producerEpoch());
            Assertions.assertEquals(transactionResult, endTxnRequest.result());
            return true;
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors errors, String str, long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) abstractRequest;
            Assertions.assertEquals(str, addOffsetsToTxnRequest.data().groupId());
            Assertions.assertEquals("foobar", addOffsetsToTxnRequest.data().transactionalId());
            Assertions.assertEquals(j, addOffsetsToTxnRequest.data().producerId());
            Assertions.assertEquals(s, addOffsetsToTxnRequest.data().producerEpoch());
            return true;
        }, (AbstractResponse) new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData().setErrorCode(errors.code())));
    }

    private void prepareTxnOffsetCommitResponse(String str, long j, short s, Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assertions.assertEquals(str, txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals(j, txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals(s, txnOffsetCommitRequest.data().producerEpoch());
            return true;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, map));
    }

    private void prepareTxnOffsetCommitResponse(String str, long j, short s, String str2, String str3, int i, Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(abstractRequest -> {
            TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
            Assertions.assertEquals(str, txnOffsetCommitRequest.data().groupId());
            Assertions.assertEquals(j, txnOffsetCommitRequest.data().producerId());
            Assertions.assertEquals(s, txnOffsetCommitRequest.data().producerEpoch());
            Assertions.assertEquals(str2, txnOffsetCommitRequest.data().groupInstanceId());
            Assertions.assertEquals(str3, txnOffsetCommitRequest.data().memberId());
            Assertions.assertEquals(i, txnOffsetCommitRequest.data().generationId());
            return true;
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, map));
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i) {
        return produceResponse(topicPartition, j, errors, i, 10);
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i, int i2) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse(errors, j, -1L, i2)), i);
    }

    private void initializeIdempotentProducerId(long j, short s) {
        this.client.prepareResponse(abstractRequest -> {
            Assertions.assertNull(((InitProducerIdRequest) abstractRequest).data().transactionalId());
            return true;
        }, new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(Errors.NONE.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0)), false);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
    }

    private void doInitTransactions() {
        doInitTransactions(13131L, (short) 1);
    }

    private void doInitTransactions(long j, short s) {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        runUntil(() -> {
            return Boolean.valueOf(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION) != null);
        });
        Assertions.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, false, j, s);
        TransactionManager transactionManager = this.transactionManager;
        transactionManager.getClass();
        runUntil(transactionManager::hasProducerId);
        initializeTransactions.await();
        Assertions.assertTrue(initializeTransactions.isSuccessful());
        Assertions.assertTrue(initializeTransactions.isAcked());
    }

    private void assertAbortableError(Class<? extends RuntimeException> cls) {
        try {
            this.transactionManager.beginCommit();
            Assertions.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assertions.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assertions.assertTrue(this.transactionManager.hasError());
        }
        Assertions.assertTrue(this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assertions.assertFalse(this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cls) {
        Assertions.assertTrue(this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assertions.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assertions.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assertions.assertTrue(this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assertions.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e2) {
            Assertions.assertTrue(cls.isAssignableFrom(e2.getCause().getClass()));
            Assertions.assertTrue(this.transactionManager.hasError());
        }
    }

    private void assertProduceFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assertions.assertTrue(future.isDone());
        try {
            future.get();
            Assertions.fail("Expected produce future to throw");
        } catch (ExecutionException e) {
        }
    }

    private void runUntil(Supplier<Boolean> supplier) {
        ProducerTestUtils.runUntil(this.sender, supplier);
    }

    private Metadata setupMetadata(Cluster cluster) {
        Metadata metadata = (Metadata) Mockito.mock(Metadata.class);
        Mockito.when(metadata.fetch()).thenReturn(cluster);
        Iterator it = cluster.topics().iterator();
        while (it.hasNext()) {
            for (PartitionInfo partitionInfo : cluster.partitionsForTopic((String) it.next())) {
                Mockito.when(metadata.currentLeader(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))).thenReturn(new Metadata.LeaderAndEpoch(Optional.of(partitionInfo.leader()), Optional.of(999)));
            }
        }
        return metadata;
    }
}
