/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.record;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RecordValidationStats;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.support.ParameterDeclarations;

public class MemoryRecordsBuilderTest {
    private final Time time = Time.SYSTEM;

    private static List<Method> getBuildFns() {
        List<Method> buildFns;
        try {
            buildFns = Arrays.asList(MemoryRecordsBuilder.class.getDeclaredMethod("build", new Class[0]), MemoryRecordsBuilder.class.getDeclaredMethod("buildWithClonedBuffer", new Class[0]));
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException(e);
        }
        return buildFns;
    }

    @Test
    public void testUnsupportedCompress() {
        BiFunction<Byte, Compression, MemoryRecordsBuilder> builderBiFunction = (magic, compression) -> new MemoryRecordsBuilder(ByteBuffer.allocate(128), magic.byteValue(), compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, 128);
        Arrays.asList((byte)0, (byte)1).forEach(magic -> {
            Exception e = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> builderBiFunction.apply((Byte)magic, (Compression)Compression.zstd().build()));
            Assertions.assertEquals((Object)e.getMessage(), (Object)("ZStandard compression is not supported for magic " + magic));
        });
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteEmptyRecordSet(Args args) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        Assertions.assertEquals((int)0, (int)records.sizeInBytes());
        Assertions.assertEquals((int)args.bufferOffset, (int)buffer.position());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteTransactionalRecordSet(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = 9809L;
        short epoch = 15;
        int sequence = 2342;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
            MemoryRecords records = this.callBuildFn(builder, args.buildFn);
            List batches = Utils.toList(records.batches().iterator());
            Assertions.assertEquals((int)1, (int)batches.size());
            Assertions.assertTrue((boolean)((MutableRecordBatch)batches.get(0)).isTransactional());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteTransactionalWithInvalidPID(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = -1L;
        short epoch = 15;
        int sequence = 2342;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> ((MemoryRecordsBuilder)builder).close());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteIdempotentWithInvalidEpoch(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = 9809L;
        short epoch = -1;
        int sequence = 2342;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> ((MemoryRecordsBuilder)builder).close());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteIdempotentWithInvalidBaseSequence(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = 9809L;
        short epoch = 15;
        int sequence = -1;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> ((MemoryRecordsBuilder)builder).close());
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteEndTxnMarkerNonTransactionalBatch(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = 9809L;
        short epoch = 15;
        int sequence = -1;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, false, true, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(-1L, new EndTransactionMarker(ControlRecordType.ABORT, 0)));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteEndTxnMarkerNonControlBatch(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        long pid = 9809L;
        short epoch = 15;
        int sequence = -1;
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(-1L, new EndTransactionMarker(ControlRecordType.ABORT, 0)));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteLeaderChangeControlBatchWithoutLeaderEpoch(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, true, -1, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            boolean leaderId = true;
            MemoryRecordsBuilder builder = supplier.get();
            Assertions.assertThrows(IllegalArgumentException.class, () -> builder.appendLeaderChangeMessage(-1L, new LeaderChangeMessage().setLeaderId(1).setVoters(Collections.emptyList())));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testWriteLeaderChangeControlBatch(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        boolean leaderId = true;
        int leaderEpoch = 5;
        List<Integer> voters = Arrays.asList(2, 3);
        Supplier<MemoryRecordsBuilder> supplier = () -> new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, true, 5, buffer.capacity());
        if (args.magic < 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            MemoryRecordsBuilder builder = supplier.get();
            builder.appendLeaderChangeMessage(-1L, new LeaderChangeMessage().setLeaderId(1).setVoters(voters.stream().map(voterId -> new LeaderChangeMessage.Voter().setVoterId(voterId.intValue())).collect(Collectors.toList())));
            MemoryRecords memoryRecords = this.callBuildFn(builder, args.buildFn);
            List records = TestUtils.toList(memoryRecords.records());
            Assertions.assertEquals((int)1, (int)records.size());
            LeaderChangeMessage leaderChangeMessage = ControlRecordUtils.deserializeLeaderChangeMessage((Record)((Record)records.get(0)));
            Assertions.assertEquals((int)1, (int)leaderChangeMessage.leaderId());
            Assertions.assertEquals(voters, leaderChangeMessage.voters().stream().map(LeaderChangeMessage.Voter::voterId).collect(Collectors.toList()));
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testLegacyCompressionRate(Args args) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        Supplier<LegacyRecord[]> supplier = () -> new LegacyRecord[]{LegacyRecord.create((byte)magic, (long)0L, (byte[])"a".getBytes(), (byte[])"1".getBytes(), (boolean)true), LegacyRecord.create((byte)magic, (long)1L, (byte[])"b".getBytes(), (byte[])"2".getBytes(), (boolean)true), LegacyRecord.create((byte)magic, (long)2L, (byte[])"c".getBytes(), (byte[])"3".getBytes(), (boolean)true)};
        if (magic >= 2) {
            Assertions.assertThrows(IllegalArgumentException.class, supplier::get);
        } else {
            LegacyRecord[] records = supplier.get();
            MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
            int uncompressedSize = 0;
            for (LegacyRecord record : records) {
                uncompressedSize += record.sizeInBytes() + 12;
                builder.append(record);
            }
            MemoryRecords memoryRecords = this.callBuildFn(builder, args.buildFn);
            if (args.compression.type() == CompressionType.NONE) {
                Assertions.assertEquals((double)1.0, (double)builder.compressionRatio(), (double)1.0E-5);
            } else {
                int recordHead = magic == 0 ? 14 : 22;
                int compressedSize = memoryRecords.sizeInBytes() - 12 - recordHead;
                double computedCompressionRate = (double)compressedSize / (double)uncompressedSize;
                Assertions.assertEquals((double)computedCompressionRate, (double)builder.compressionRatio(), (double)1.0E-5);
            }
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testEstimatedSizeInBytes(Args args) {
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        int previousEstimate = 0;
        for (int i = 0; i < 10; ++i) {
            builder.append(new SimpleRecord((long)i, ("" + i).getBytes()));
            int currentEstimate = builder.estimatedSizeInBytes();
            Assertions.assertTrue((currentEstimate > previousEstimate ? 1 : 0) != 0);
            previousEstimate = currentEstimate;
        }
        int bytesWrittenBeforeClose = builder.estimatedSizeInBytes();
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        Assertions.assertEquals((int)records.sizeInBytes(), (int)builder.estimatedSizeInBytes());
        if (args.compression.type() == CompressionType.NONE) {
            Assertions.assertEquals((int)records.sizeInBytes(), (int)bytesWrittenBeforeClose);
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void buildUsingLogAppendTime(Args args) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        long logAppendTime = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compression, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.append(0L, "a".getBytes(), "1".getBytes());
        builder.append(0L, "b".getBytes(), "2".getBytes());
        builder.append(0L, "c".getBytes(), "3".getBytes());
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        MemoryRecordsBuilder.RecordsInfo info = builder.info();
        Assertions.assertEquals((long)logAppendTime, (long)info.maxTimestamp);
        if (args.compression.type() == CompressionType.NONE && magic <= 1) {
            Assertions.assertEquals((long)0L, (long)info.shallowOffsetOfMaxTimestamp);
        } else {
            Assertions.assertEquals((long)2L, (long)info.shallowOffsetOfMaxTimestamp);
        }
        for (RecordBatch batch : records.batches()) {
            if (magic == 0) {
                Assertions.assertEquals((Object)TimestampType.NO_TIMESTAMP_TYPE, (Object)batch.timestampType());
                continue;
            }
            Assertions.assertEquals((Object)TimestampType.LOG_APPEND_TIME, (Object)batch.timestampType());
            for (Record record : batch) {
                Assertions.assertEquals((long)logAppendTime, (long)record.timestamp());
            }
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void buildUsingCreateTime(Args args) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        long logAppendTime = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compression, TimestampType.CREATE_TIME, 0L, logAppendTime, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.append(0L, "a".getBytes(), "1".getBytes());
        builder.append(2L, "b".getBytes(), "2".getBytes());
        builder.append(1L, "c".getBytes(), "3".getBytes());
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        MemoryRecordsBuilder.RecordsInfo info = builder.info();
        if (magic == 0) {
            Assertions.assertEquals((long)-1L, (long)info.maxTimestamp);
        } else {
            Assertions.assertEquals((long)2L, (long)info.maxTimestamp);
        }
        if (magic == 0) {
            Assertions.assertEquals((long)-1L, (long)info.shallowOffsetOfMaxTimestamp);
        } else if (args.compression.type() == CompressionType.NONE && magic == 1) {
            Assertions.assertEquals((long)1L, (long)info.shallowOffsetOfMaxTimestamp);
        } else {
            Assertions.assertEquals((long)2L, (long)info.shallowOffsetOfMaxTimestamp);
        }
        int i = 0;
        long[] expectedTimestamps = new long[]{0L, 2L, 1L};
        for (RecordBatch batch : records.batches()) {
            if (magic == 0) {
                Assertions.assertEquals((Object)TimestampType.NO_TIMESTAMP_TYPE, (Object)batch.timestampType());
                continue;
            }
            Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
            for (Record record : batch) {
                Assertions.assertEquals((long)expectedTimestamps[i++], (long)record.timestamp());
            }
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testAppendedChecksumConsistency(Args args) {
        ByteBuffer buffer = this.allocateBuffer(512, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, -1L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.append(1L, "key".getBytes(), "value".getBytes());
        MemoryRecords memoryRecords = this.callBuildFn(builder, args.buildFn);
        List records = TestUtils.toList(memoryRecords.records());
        Assertions.assertEquals((int)1, (int)records.size());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testSmallWriteLimit(Args args) {
        byte[] key = "foo".getBytes();
        byte[] value = "bar".getBytes();
        int writeLimit = 0;
        ByteBuffer buffer = this.allocateBuffer(512, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, -1L, -1L, -1, -1, false, false, -1, writeLimit);
        Assertions.assertFalse((boolean)builder.isFull());
        Assertions.assertTrue((boolean)builder.hasRoomFor(0L, key, value, Record.EMPTY_HEADERS));
        builder.append(0L, key, value);
        Assertions.assertTrue((boolean)builder.isFull());
        Assertions.assertFalse((boolean)builder.hasRoomFor(0L, key, value, Record.EMPTY_HEADERS));
        MemoryRecords memoryRecords = this.callBuildFn(builder, args.buildFn);
        List records = TestUtils.toList(memoryRecords.records());
        Assertions.assertEquals((int)1, (int)records.size());
        Record record = (Record)records.get(0);
        Assertions.assertEquals((Object)ByteBuffer.wrap(key), (Object)record.key());
        Assertions.assertEquals((Object)ByteBuffer.wrap(value), (Object)record.value());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void writePastLimit(Args args) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(64, args);
        long logAppendTime = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compression, TimestampType.CREATE_TIME, 0L, logAppendTime, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.setEstimatedCompressionRatio(0.5f);
        builder.append(0L, "a".getBytes(), "1".getBytes());
        builder.append(1L, "b".getBytes(), "2".getBytes());
        Assertions.assertFalse((boolean)builder.hasRoomFor(2L, "c".getBytes(), "3".getBytes(), Record.EMPTY_HEADERS));
        builder.append(2L, "c".getBytes(), "3".getBytes());
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        MemoryRecordsBuilder.RecordsInfo info = builder.info();
        if (magic == 0) {
            Assertions.assertEquals((long)-1L, (long)info.shallowOffsetOfMaxTimestamp);
            Assertions.assertEquals((long)-1L, (long)info.maxTimestamp);
        } else {
            Assertions.assertEquals((long)2L, (long)info.shallowOffsetOfMaxTimestamp);
            Assertions.assertEquals((long)2L, (long)info.maxTimestamp);
        }
        long i = 0L;
        for (RecordBatch batch : records.batches()) {
            if (magic == 0) {
                Assertions.assertEquals((Object)TimestampType.NO_TIMESTAMP_TYPE, (Object)batch.timestampType());
                continue;
            }
            Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)batch.timestampType());
            for (Record record : batch) {
                Assertions.assertEquals((long)i++, (long)record.timestamp());
            }
        }
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testAppendAtInvalidOffset(Args args) {
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        long logAppendTime = System.currentTimeMillis();
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, logAppendTime, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
        Assertions.assertThrows(IllegalArgumentException.class, () -> builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null));
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void shouldThrowIllegalStateExceptionOnBuildWhenAborted(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.abort();
        Assertions.assertThrows(IllegalStateException.class, () -> ((MemoryRecordsBuilder)builder).build());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void shouldResetBufferToInitialPositionOnAbort(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.append(0L, "a".getBytes(), "1".getBytes());
        builder.abort();
        Assertions.assertEquals((int)args.bufferOffset, (int)builder.buffer().position());
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void shouldThrowIllegalStateExceptionOnCloseWhenAborted(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.abort();
        Assertions.assertThrows(IllegalStateException.class, () -> ((MemoryRecordsBuilder)builder).close(), (String)"Should have thrown IllegalStateException");
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void shouldThrowIllegalStateExceptionOnAppendWhenAborted(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.abort();
        Assertions.assertThrows(IllegalStateException.class, () -> builder.append(0L, "a".getBytes(), "1".getBytes()), (String)"Should have thrown IllegalStateException");
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void shouldThrowIllegalStateExceptionOnAppendWhenClosed(Args args) {
        ByteBuffer buffer = this.allocateBuffer(128, args);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity());
        builder.append(0L, "a".getBytes(), "1".getBytes());
        this.callBuildFn(builder, args.buildFn);
        Assertions.assertEquals((Object)"Tried to append a record, but MemoryRecordsBuilder is closed for record appends", (Object)((IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> builder.append(0L, "a".getBytes(), "1".getBytes()))).getMessage());
    }

    @ParameterizedTest
    @ArgumentsSource(value=V2MemoryRecordsBuilderArgumentsProvider.class)
    public void testRecordTimestampsWithDeleteHorizon(Args args) {
        long deleteHorizon = 100L;
        int payloadLen = 0x100000;
        ByteBuffer buffer = this.allocateBuffer(payloadLen * 2, args);
        ByteBufferOutputStream byteBufferOutputStream = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBufferOutputStream, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, 0, deleteHorizon, true);
        builder.append(50L, "0".getBytes(), "0".getBytes());
        builder.append(100L, "1".getBytes(), null);
        builder.append(150L, "2".getBytes(), "2".getBytes());
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        List batches = TestUtils.toList(records.batches());
        Assertions.assertEquals((Object)OptionalLong.of(deleteHorizon), (Object)((MutableRecordBatch)batches.get(0)).deleteHorizonMs());
        CloseableIterator recordIterator = ((MutableRecordBatch)batches.get(0)).streamingIterator(BufferSupplier.create());
        Record record = (Record)recordIterator.next();
        Assertions.assertEquals((long)50L, (long)record.timestamp());
        record = (Record)recordIterator.next();
        Assertions.assertEquals((long)100L, (long)record.timestamp());
        record = (Record)recordIterator.next();
        Assertions.assertEquals((long)150L, (long)record.timestamp());
        recordIterator.close();
    }

    @ParameterizedTest
    @ArgumentsSource(value=MemoryRecordsBuilderArgumentsProvider.class)
    public void testRecordTimestampValidation(Args args) {
        this.testRecordTimestampValidation(args, true);
        this.testRecordTimestampValidation(args, false);
    }

    @ParameterizedTest
    @ArgumentsSource(value=NoBuildFnMemoryRecordsBuilderArgumentsProvider.class)
    public void testBuildWithClonedBuffer(Args args) {
        ByteBuffer sourceBuffer1 = this.allocateBuffer(1024, args);
        ByteBuffer sourceBuffer2 = this.allocateBuffer(1024, args);
        MemoryRecordsBuilder builder1 = MemoryRecordsBuilderTest.createBuilderAndAppendRecords(sourceBuffer1, args);
        MemoryRecordsBuilder builder2 = MemoryRecordsBuilderTest.createBuilderAndAppendRecords(sourceBuffer2, args);
        MemoryRecords records = builder1.build();
        MemoryRecords recordsWithClonedBuffer = builder2.buildWithClonedBuffer();
        Assertions.assertEquals((Object)records, (Object)recordsWithClonedBuffer);
        sourceBuffer1.putInt(args.bufferOffset + 42, 42);
        sourceBuffer2.putInt(args.bufferOffset + 42, 42);
        Assertions.assertEquals((Object)MemoryRecordsBuilderTest.sliceWithOffset(sourceBuffer1, args.bufferOffset), (Object)records.buffer());
        Assertions.assertNotEquals((Object)MemoryRecordsBuilderTest.sliceWithOffset(sourceBuffer2, args.bufferOffset), (Object)recordsWithClonedBuffer.buffer());
        Assertions.assertNotEquals((Object)records, (Object)recordsWithClonedBuffer);
    }

    private static MemoryRecordsBuilder createBuilderAndAppendRecords(ByteBuffer sourceBuffer, Args args) {
        ByteBufferOutputStream outputStream = new ByteBufferOutputStream(sourceBuffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, args.magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, sourceBuffer.capacity(), -1L, false);
        builder.append(-1L, "a".getBytes(), "1".getBytes());
        builder.append(0L, "b".getBytes(), "2".getBytes());
        builder.append(1L, "c".getBytes(), "3".getBytes());
        builder.append(-1000L, "d".getBytes(), "4".getBytes());
        return builder;
    }

    private static ByteBuffer sliceWithOffset(ByteBuffer sourceBuffer, int offset) {
        ByteBuffer duplicate = sourceBuffer.duplicate();
        int positionWithOffset = duplicate.position();
        duplicate.position(offset);
        duplicate.limit(positionWithOffset);
        return duplicate.slice();
    }

    private void testRecordTimestampValidation(Args args, boolean validationEnabled) {
        byte magic = args.magic;
        ByteBuffer buffer = this.allocateBuffer(1024, args);
        ByteBufferOutputStream outputStream = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(outputStream, magic, args.compression, TimestampType.CREATE_TIME, 0L, 0L, -1L, -1, -1, false, false, -1, buffer.capacity(), -1L, validationEnabled);
        builder.append(-1L, "a".getBytes(), "1".getBytes());
        builder.append(0L, "b".getBytes(), "2".getBytes());
        builder.append(1L, "c".getBytes(), "3".getBytes());
        if (validationEnabled) {
            Assertions.assertThrows(IllegalArgumentException.class, () -> builder.append(-1000L, "d".getBytes(), "4".getBytes()));
        } else {
            builder.append(-1000L, "d".getBytes(), "4".getBytes());
        }
        MemoryRecords records = this.callBuildFn(builder, args.buildFn);
        Iterator it = TestUtils.toList(records.records()).iterator();
        if (magic == 0) {
            while (it.hasNext()) {
                Assertions.assertEquals((long)-1L, (long)((Record)it.next()).timestamp());
            }
        } else {
            Assertions.assertEquals((long)-1L, (long)((Record)it.next()).timestamp());
            Assertions.assertEquals((long)0L, (long)((Record)it.next()).timestamp());
            Assertions.assertEquals((long)1L, (long)((Record)it.next()).timestamp());
            if (!validationEnabled) {
                Assertions.assertEquals((long)-1000L, (long)((Record)it.next()).timestamp());
            }
            Assertions.assertFalse((boolean)it.hasNext());
        }
    }

    private void verifyRecordsProcessingStats(Compression compression, RecordValidationStats processingStats, int numRecords, int numRecordsConverted, long finalBytes, long preConvertedBytes) {
        Assertions.assertNotNull((Object)processingStats, (String)"Records processing info is null");
        Assertions.assertEquals((int)numRecordsConverted, (int)processingStats.numRecordsConverted());
        Assertions.assertTrue((processingStats.conversionTimeNanos() >= 0L ? 1 : 0) != 0, (String)("Processing time not recorded: " + String.valueOf(processingStats)));
        long tempBytes = processingStats.temporaryMemoryBytes();
        if (compression.type() == CompressionType.NONE) {
            if (numRecordsConverted == 0) {
                Assertions.assertEquals((long)finalBytes, (long)tempBytes);
            } else if (numRecordsConverted == numRecords) {
                Assertions.assertEquals((long)(preConvertedBytes + finalBytes), (long)tempBytes);
            } else {
                Assertions.assertTrue((tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes ? 1 : 0) != 0, (String)String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes));
            }
        } else {
            long compressedBytes = finalBytes - 12L - 14L;
            Assertions.assertTrue((tempBytes > compressedBytes ? 1 : 0) != 0, (String)String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes));
        }
    }

    private ByteBuffer allocateBuffer(int size, Args args) {
        ByteBuffer buffer = ByteBuffer.allocate(size);
        buffer.position(args.bufferOffset);
        return buffer;
    }

    private MemoryRecords callBuildFn(MemoryRecordsBuilder builder, Method buildFn) {
        try {
            return (MemoryRecords)buildFn.invoke((Object)builder, new Object[0]);
        }
        catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    public static class Args {
        final int bufferOffset;
        final Compression compression;
        final byte magic;
        final Method buildFn;

        public Args(int bufferOffset, Compression compression, byte magic, Method buildFn) {
            this.bufferOffset = bufferOffset;
            this.compression = compression;
            this.magic = magic;
            this.buildFn = buildFn;
        }

        public String toString() {
            return "magic=" + this.magic + ", bufferOffset=" + this.bufferOffset + ", compression=" + String.valueOf(this.compression.type()) + ", buildFn=" + (this.buildFn == null ? "null" : this.buildFn.getName());
        }
    }

    private static class V2MemoryRecordsBuilderArgumentsProvider
    implements ArgumentsProvider {
        private V2MemoryRecordsBuilderArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ParameterDeclarations parameterDeclarations, ExtensionContext context) {
            ArrayList<Arguments> values = new ArrayList<Arguments>();
            for (int bufferOffset : Arrays.asList(0, 15)) {
                for (CompressionType type : CompressionType.values()) {
                    for (Method buildFn : MemoryRecordsBuilderTest.getBuildFns()) {
                        values.add(Arguments.of((Object[])new Object[]{new Args(bufferOffset, Compression.of((CompressionType)type).build(), 2, buildFn)}));
                    }
                }
            }
            return values.stream();
        }
    }

    private static class NoBuildFnMemoryRecordsBuilderArgumentsProvider
    implements ArgumentsProvider {
        private NoBuildFnMemoryRecordsBuilderArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ParameterDeclarations parameterDeclarations, ExtensionContext context) {
            ArrayList<Arguments> values = new ArrayList<Arguments>();
            for (int bufferOffset : Arrays.asList(0, 15)) {
                for (CompressionType type : CompressionType.values()) {
                    List<Byte> magics = type == CompressionType.ZSTD ? Collections.singletonList((byte)2) : Arrays.asList((byte)0, (byte)1, (byte)2);
                    for (byte magic : magics) {
                        values.add(Arguments.of((Object[])new Object[]{new Args(bufferOffset, Compression.of((CompressionType)type).build(), magic, null)}));
                    }
                }
            }
            return values.stream();
        }
    }

    private static class MemoryRecordsBuilderArgumentsProvider
    implements ArgumentsProvider {
        private MemoryRecordsBuilderArgumentsProvider() {
        }

        public Stream<? extends Arguments> provideArguments(ParameterDeclarations parameterDeclarations, ExtensionContext context) {
            ArrayList<Arguments> values = new ArrayList<Arguments>();
            for (int bufferOffset : Arrays.asList(0, 15)) {
                for (CompressionType type : CompressionType.values()) {
                    List<Byte> magics = type == CompressionType.ZSTD ? Collections.singletonList((byte)2) : Arrays.asList((byte)0, (byte)1, (byte)2);
                    for (byte magic : magics) {
                        for (Method buildFn : MemoryRecordsBuilderTest.getBuildFns()) {
                            values.add(Arguments.of((Object[])new Object[]{new Args(bufferOffset, Compression.of((CompressionType)type).build(), magic, buildFn)}));
                        }
                    }
                }
            }
            return values.stream();
        }
    }
}

