/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.management.Attribute;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import kafka.api.IntegrationTestHarness;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.tier.fetcher.MemoryTracker;
import kafka.tier.fetcher.TierFetcher;
import kafka.tier.state.TierPartitionState;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Some;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005mb\u0001B\u000b\u0017\u0001mAQA\t\u0001\u0005\u0002\rBQA\n\u0001\u0005R\u001dBQA\f\u0001\u0005\n=Bq\u0001\u000f\u0001C\u0002\u0013%\u0011\b\u0003\u0004>\u0001\u0001\u0006IA\u000f\u0005\b}\u0001\u0011\r\u0011\"\u0003(\u0011\u0019y\u0004\u0001)A\u0005Q!9\u0001\t\u0001a\u0001\n\u0013\t\u0005bB'\u0001\u0001\u0004%IA\u0014\u0005\u0007)\u0002\u0001\u000b\u0015\u0002\"\t\u000bU\u0003A\u0011\u0002,\t\u000f-\u0004!\u0019!C\u0001Y\"1q\u000f\u0001Q\u0001\n5DQ\u0001\u001f\u0001\u0005BeDa!!\u0003\u0001\t\u0003J\bbBA\n\u0001\u0011%\u0011Q\u0003\u0005\b\u0003?\u0001A\u0011BA\u0011\u0011\u001d\t9\u0003\u0001C\u0005\u0003SAa!a\f\u0001\t\u0013I\bBBA\u0019\u0001\u0011\u0005\u0011P\u0001\u0010US\u0016\u0014\u0018J\u001c;fOJ\fG/[8o)J\fgn]1di&|g\u000eV3ti*\u0011q\u0003G\u0001\u0005i&,'OC\u0001\u001a\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000f\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0005}A\u0012aA1qS&\u0011\u0011E\b\u0002\u0017\u0013:$Xm\u001a:bi&|g\u000eV3ti\"\u000b'O\\3tg\u00061A(\u001b8jiz\"\u0012\u0001\n\t\u0003K\u0001i\u0011AF\u0001\fEJ|7.\u001a:D_VtG/F\u0001)!\tIC&D\u0001+\u0015\u0005Y\u0013!B:dC2\f\u0017BA\u0017+\u0005\rIe\u000e^\u0001\u000eG>tg-[4ve\u0016lunY6\u0016\u0003A\u0002\"!\r\u001c\u000e\u0003IR!a\r\u001b\u0002\t1\fgn\u001a\u0006\u0002k\u0005!!.\u0019<b\u0013\t9$G\u0001\u0004PE*,7\r^\u0001\u0006i>\u0004\u0018nY\u000b\u0002uA\u0011\u0011gO\u0005\u0003yI\u0012aa\u0015;sS:<\u0017A\u0002;pa&\u001c\u0007%\u0001\u0006qCJ$\u0018\u000e^5p]N\f1\u0002]1si&$\u0018n\u001c8tA\u0005\t\u0002/\u0019:uSRLwN\u001c+p\u0019\u0016\fG-\u001a:\u0016\u0003\t\u0003Ba\u0011&)Q9\u0011A\t\u0013\t\u0003\u000b*j\u0011A\u0012\u0006\u0003\u000fj\ta\u0001\u0010:p_Rt\u0014BA%+\u0003\u0019\u0001&/\u001a3fM&\u00111\n\u0014\u0002\u0004\u001b\u0006\u0004(BA%+\u0003U\u0001\u0018M\u001d;ji&|g\u000eV8MK\u0006$WM]0%KF$\"a\u0014*\u0011\u0005%\u0002\u0016BA)+\u0005\u0011)f.\u001b;\t\u000fMK\u0011\u0011!a\u0001\u0005\u0006\u0019\u0001\u0010J\u0019\u0002%A\f'\u000f^5uS>tGk\u001c'fC\u0012,'\u000fI\u0001\u0010i>\u0004\u0018n\u0019)beRLG/[8ogV\tq\u000bE\u0002Y;\u0002t!!W.\u000f\u0005\u0015S\u0016\"A\u0016\n\u0005qS\u0013a\u00029bG.\fw-Z\u0005\u0003=~\u00131aU3r\u0015\ta&\u0006\u0005\u0002bS6\t!M\u0003\u0002dI\u000611m\\7n_:T!!G3\u000b\u0005\u0019<\u0017AB1qC\u000eDWMC\u0001i\u0003\ry'oZ\u0005\u0003U\n\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.\u0001\u0004fq&$X\rZ\u000b\u0002[B\u0011a.^\u0007\u0002_*\u0011\u0001/]\u0001\u0007CR|W.[2\u000b\u0005I\u001c\u0018AC2p]\u000e,(O]3oi*\u0011A\u000fN\u0001\u0005kRLG.\u0003\u0002w_\ni\u0011\t^8nS\u000e\u0014un\u001c7fC:\fq!\u001a=ji\u0016$\u0007%A\u0003tKR,\u0006\u000fF\u0001PQ\tq1\u0010E\u0002}\u0003\u000bi\u0011! \u0006\u0003?yT1a`A\u0001\u0003\u001dQW\u000f]5uKJT1!a\u0001h\u0003\u0015QWO\\5u\u0013\r\t9! \u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$un\u001e8)\u0007=\ti\u0001E\u0002}\u0003\u001fI1!!\u0005~\u0005%\te\r^3s\u000b\u0006\u001c\u0007.\u0001\bqe>$WoY3SK\u000e|'\u000fZ:\u0015\u000b=\u000b9\"a\u0007\t\r\u0005e\u0001\u00031\u0001)\u0003!q')\u0019;dQ\u0016\u001c\bBBA\u000f!\u0001\u0007\u0001&A\bsK\u000e|'\u000fZ:QKJ\u0014\u0015\r^2i\u0003i9W\r\u001e'fC\u0012,'OR8s)>\u0004\u0018n\u0019)beRLG/[8o)\rA\u00131\u0005\u0005\u0007\u0003K\t\u0002\u0019\u00011\u0002)1,\u0017\rZ3s)>\u0004\u0018n\u0019)beRLG/[8o\u0003]9\u0018-\u001b;V]RLGnU3h[\u0016tGo\u001d+jKJ,G\rF\u0002P\u0003WAa!!\f\u0013\u0001\u0004A\u0013AD7j]:+XnU3h[\u0016tGo]\u0001\u0012g&lW\u000f\\1uKJ+G/\u001a8uS>t\u0017a\n;fgR\f%o\u00195jm\u0016\fe\u000e\u001a$fi\u000eD7+\u001b8hY\u0016$v\u000e]5d!\u0006\u0014H/\u001b;j_:D3\u0001FA\u001b!\ra\u0018qG\u0005\u0004\u0003si(\u0001\u0002+fgR\u0004")
public class TierIntegrationTransactionTest
extends IntegrationTestHarness {
    private final String topic;
    private final int partitions;
    private Map<Object, Object> partitionToLeader;
    private final AtomicBoolean exited;

    @Override
    public int brokerCount() {
        return 1;
    }

    private Object configureMock() {
        this.serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        return this.serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
    }

    private String topic() {
        return this.topic;
    }

    private int partitions() {
        return this.partitions;
    }

    private Map<Object, Object> partitionToLeader() {
        return this.partitionToLeader;
    }

    private void partitionToLeader_$eq(Map<Object, Object> x$1) {
        this.partitionToLeader = x$1;
    }

    private Seq<TopicPartition> topicPartitions() {
        return (Seq)package$.MODULE$.Range().apply(0, this.partitions()).map((Function1 & Serializable & scala.Serializable)p -> TierIntegrationTransactionTest.$anonfun$topicPartitions$1(this, BoxesRunTime.unboxToInt((Object)p)), IndexedSeq$.MODULE$.canBuildFrom());
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override
    @BeforeEach
    public void setUp() {
        Exit.setExitProcedure((x$1, x$2) -> this.exited().set(true));
        super.setUp();
        Properties props = new Properties();
        props.put("confluent.tier.enable", "true");
        props.put("segment.bytes", "10000");
        props.put("confluent.tier.local.hotset.bytes", "5000");
        props.put("retention.bytes", "-1");
        props.put("retention.ms", "-1");
        this.partitionToLeader_$eq(this.createTopic(this.topic(), this.partitions(), 1, props));
    }

    @Override
    @AfterEach
    public void tearDown() {
        super.tearDown();
        Assertions.assertFalse((boolean)this.exited().get());
    }

    private void produceRecords(int nBatches, int recordsPerBatch) {
        Properties props = new Properties();
        props.put("transactional.id", "1");
        ByteArraySerializer x$2 = this.createProducer$default$1();
        ByteArraySerializer x$3 = this.createProducer$default$2();
        producer.initTransactions();
        try (KafkaProducer producer = this.createProducer(x$2, x$3, props);){
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), nBatches).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)b -> {
                boolean abortBatch = b % 2 == 0;
                String key = abortBatch ? "aborted" : "committed";
                IndexedSeq producerRecords = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), recordsPerBatch).map((Function1 & Serializable & scala.Serializable)i -> TierIntegrationTransactionTest.$anonfun$produceRecords$2(this, recordsPerBatch, b, key, BoxesRunTime.unboxToInt((Object)i)), IndexedSeq$.MODULE$.canBuildFrom());
                producer.beginTransaction();
                producerRecords.foreach((Function1 & Serializable & scala.Serializable)record -> (RecordMetadata)producer.send(record).get(10L, TimeUnit.SECONDS));
                if (abortBatch) {
                    producer.abortTransaction();
                    return;
                }
                producer.commitTransaction();
            });
        }
    }

    private int getLeaderForTopicPartition(TopicPartition leaderTopicPartition) {
        return BoxesRunTime.unboxToInt((Object)this.partitionToLeader().apply((Object)BoxesRunTime.boxToInteger((int)leaderTopicPartition.partition())));
    }

    /*
     * WARNING - void declaration
     */
    private void waitUntilSegmentsTiered(int minNumSegments) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = 60000L;
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierIntegrationTransactionTest.$anonfun$waitUntilSegmentsTiered$1(this, minNumSegments)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)TierIntegrationTransactionTest.$anonfun$waitUntilSegmentsTiered$3(minNumSegments));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private void simulateRetention() {
        this.topicPartitions().foreach((Function1 & Serializable & scala.Serializable)tp -> {
            TierIntegrationTransactionTest.$anonfun$simulateRetention$1(this, tp);
            return BoxedUnit.UNIT;
        });
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testArchiveAndFetchSingleTopicPartition() {
        void var19_18;
        void var15_15;
        String[] attrs;
        String bean;
        int nBatches = 100;
        int recordsPerBatch = 100;
        this.produceRecords(nBatches, recordsPerBatch);
        this.waitUntilSegmentsTiered(10);
        this.simulateRetention();
        TopicPartition topicPartition = (TopicPartition)this.topicPartitions().head();
        String brokerList = TestUtils$.MODULE$.bootstrapServers((Seq<KafkaServer>)this.servers(), this.listenerName());
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", brokerList);
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("max.poll.records", "50000");
        consumerProps.put("isolation.level", "read_committed");
        this.consumerConfig().setProperty("group.id", "foo");
        this.consumerConfig().setProperty("client.id", "foo");
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps);){
            ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
            partitions.add(topicPartition);
            partitions.add(new TopicPartition(this.topic(), 1));
            consumer.assign(partitions);
            consumer.seekToBeginning(partitions);
            ArrayList valuesRead = new ArrayList();
            while (valuesRead.size() < nBatches * recordsPerBatch / 2 - recordsPerBatch) {
                consumer.poll(Duration.ofMillis(1000L)).forEach((Consumer)new Consumer<ConsumerRecord<String, String>>(null, valuesRead){
                    private final ArrayList valuesRead$1;

                    public Consumer<ConsumerRecord<String, String>> andThen(Consumer<? super ConsumerRecord<String, String>> x$1) {
                        return Consumer.super.andThen(x$1);
                    }

                    public void accept(ConsumerRecord<String, String> r) {
                        Assertions.assertNotEquals((Object)r.key(), (Object)"aborted", (String)"did not expect to find any aborted records");
                        this.valuesRead$1.add(BoxesRunTime.boxToInteger((int)Integer.parseInt((String)r.value())));
                    }
                    {
                        this.valuesRead$1 = valuesRead$1;
                    }
                });
            }
        }
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
        List list = ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(mBeanServer.getAttributes(new ObjectName(bean = "kafka.server:type=TierFetcher"), attrs = (String[])((Object[])new String[]{"BytesFetchedTotal"})).asList()).asScala()).map((Function1 & Serializable & scala.Serializable)attr -> BoxesRunTime.boxToDouble((double)TierIntegrationTransactionTest.$anonfun$testArchiveAndFetchSingleTopicPartition$1(attr)), Buffer$.MODULE$.canBuildFrom())).toList();
        Some some = List$.MODULE$.unapplySeq((Seq)list);
        if (some.isEmpty() || some.get() == null || ((LinearSeqOptimized)some.get()).lengthCompare(1) != 0) {
            throw new MatchError((Object)list);
        }
        double bytesFetchedTotal = BoxesRunTime.unboxToDouble((Object)((LinearSeqOptimized)some.get()).apply(0));
        Assertions.assertTrue((var15_15 > 0.0 ? 1 : 0) != 0, (String)"tier fetch metric shows no data fetched from tiered storage");
        List list2 = ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(mBeanServer.getAttributes(new ObjectName("kafka.tier.tasks.archive:type=TierArchiver,name=BytesPerSec"), (String[])((Object[])new String[]{"MeanRate"})).asList()).asScala()).map((Function1 & Serializable & scala.Serializable)attr -> BoxesRunTime.boxToDouble((double)TierIntegrationTransactionTest.$anonfun$testArchiveAndFetchSingleTopicPartition$2(attr)), Buffer$.MODULE$.canBuildFrom())).toList();
        Some some2 = List$.MODULE$.unapplySeq((Seq)list2);
        if (some2.isEmpty() || some2.get() == null || ((LinearSeqOptimized)some2.get()).lengthCompare(1) != 0) {
            throw new MatchError((Object)list2);
        }
        double meanArchiveRate = BoxesRunTime.unboxToDouble((Object)((LinearSeqOptimized)some2.get()).apply(0));
        Assertions.assertTrue((var19_18 > 0.0 ? 1 : 0) != 0, (String)"tier archiver mean rate shows no data uploaded to tiered storage");
        this.servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
            TierIntegrationTransactionTest.$anonfun$testArchiveAndFetchSingleTopicPartition$3(server);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$topicPartitions$1(TierIntegrationTransactionTest $this, int p) {
        return new TopicPartition($this.topic(), p);
    }

    public static final /* synthetic */ ProducerRecord $anonfun$produceRecords$2(TierIntegrationTransactionTest $this, int recordsPerBatch$1, int b$1, String key$1, int i) {
        int m = recordsPerBatch$1 * b$1 + i;
        long timestamp = (long)b$1 + 1L * (long)i;
        return new ProducerRecord($this.topic(), null, Predef$.MODULE$.long2Long(timestamp), (Object)key$1.getBytes(StandardCharsets.UTF_8), (Object)String.valueOf(BoxesRunTime.boxToInteger((int)m)).getBytes(StandardCharsets.UTF_8));
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$2(TierIntegrationTransactionTest $this, int minNumSegments$1, TopicPartition tp) {
        int leaderId = $this.getLeaderForTopicPartition(tp);
        LogManager qual$1 = ((KafkaServer)$this.serverForId(leaderId).get()).logManager();
        boolean x$2 = qual$1.getLog$default$2();
        TierPartitionState tierPartitionState = ((AbstractLog)qual$1.getLog(tp, x$2).get()).tierPartitionState();
        long endOffset = tierPartitionState.endOffset();
        long committedEndOffset = tierPartitionState.committedEndOffset();
        return endOffset > 0L && committedEndOffset > 0L && endOffset == committedEndOffset && tierPartitionState.numSegments() > minNumSegments$1;
    }

    public static final /* synthetic */ boolean $anonfun$waitUntilSegmentsTiered$1(TierIntegrationTransactionTest $this, int minNumSegments$1) {
        return $this.topicPartitions().forall((Function1 & Serializable & scala.Serializable)tp -> BoxesRunTime.boxToBoolean((boolean)TierIntegrationTransactionTest.$anonfun$waitUntilSegmentsTiered$2($this, minNumSegments$1, tp)));
    }

    public static final /* synthetic */ String $anonfun$waitUntilSegmentsTiered$3(int minNumSegments$1) {
        return new StringBuilder(61).append("timeout waiting for at least ").append(minNumSegments$1).append(" to be archived and materialized").toString();
    }

    public static final /* synthetic */ void $anonfun$simulateRetention$1(TierIntegrationTransactionTest $this, TopicPartition tp) {
        boolean x$2;
        int leaderId = $this.getLeaderForTopicPartition(tp);
        LogManager qual$1 = ((KafkaServer)$this.serverForId(leaderId).get()).replicaManager().logManager();
        Assertions.assertTrue((((AbstractLog)qual$1.getLog(tp, x$2 = qual$1.getLog$default$2()).get()).deleteOldSegments() > 0 ? 1 : 0) != 0, (String)"tiered segments should have been deleted");
    }

    public static final /* synthetic */ double $anonfun$testArchiveAndFetchSingleTopicPartition$1(Attribute attr) {
        return BoxesRunTime.unboxToDouble((Object)attr.getValue());
    }

    public static final /* synthetic */ double $anonfun$testArchiveAndFetchSingleTopicPartition$2(Attribute attr) {
        return BoxesRunTime.unboxToDouble((Object)attr.getValue());
    }

    public static final /* synthetic */ void $anonfun$testArchiveAndFetchSingleTopicPartition$3(KafkaServer server) {
        MemoryTracker memoryTracker = ((TierFetcher)server.tierFetcherOpt().get()).memoryTracker();
        Assertions.assertEquals((long)0L, (long)memoryTracker.leased(), (String)new StringBuilder(54).append("expected leased TierFetcher memory for broker ").append(server.config().brokerId()).append(" to be 0").toString());
    }

    public TierIntegrationTransactionTest() {
        this.serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataReplicationFactorProp(), "1");
        this.serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), Integer.toString(Integer.MAX_VALUE));
        this.serverConfig().put(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        this.serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "1");
        this.serverConfig().put(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), "1");
        this.configureMock();
        this.topic = UUID.randomUUID().toString();
        this.partitions = 1;
        this.partitionToLeader = (Map)Predef$.MODULE$.Map().apply((Seq)Nil$.MODULE$);
        this.exited = new AtomicBoolean(false);
    }
}

