/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig$;
import kafka.server.epoch.LeaderEpochFileCache;
import kafka.tier.TierTestUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.TraversableLike;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u00055e\u0001B\t\u0013\u0001]AQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001C\u0002\u0013E\u0013\u0006\u0003\u00041\u0001\u0001\u0006IA\u000b\u0005\bc\u0001\u0011\r\u0011\"\u00013\u0011\u0019Y\u0004\u0001)A\u0005g!9A\b\u0001b\u0001\n\u0003i\u0004B\u0002#\u0001A\u0003%a\bC\u0004F\u0001\t\u0007I\u0011\u0001$\t\rE\u0003\u0001\u0015!\u0003H\u0011\u0015\u0011\u0006\u0001\"\u0011T\u0011\u0015A\u0007\u0001\"\u0011j\u0011\u0015q\u0007\u0001\"\u0001p\u0011\u001d\t9\u0003\u0001C\u0005\u0003SAq!a\u0011\u0001\t\u0013\t)\u0005C\u0004\u0002Z\u0001!I!a\u0017\t\u000f\u0005-\u0004\u0001\"\u0003\u0002n\t1C+[3s\u000bB|7\r[*uCR,'+\u001a<pYZLgn\u001a*fa2L7-\u0019;j_:$Vm\u001d;\u000b\u0005M!\u0012\u0001\u0002;jKJT\u0011!F\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001D\b\t\u00033qi\u0011A\u0007\u0006\u00037Q\t1!\u00199j\u0013\ti\"D\u0001\fJ]R,wM]1uS>tG+Z:u\u0011\u0006\u0014h.Z:t!\ty\"%D\u0001!\u0015\t\tC#A\u0003vi&d7/\u0003\u0002$A\t9Aj\\4hS:<\u0017A\u0002\u001fj]&$h\bF\u0001'!\t9\u0003!D\u0001\u0013\u0003-\u0011'o\\6fe\u000e{WO\u001c;\u0016\u0003)\u0002\"a\u000b\u0018\u000e\u00031R\u0011!L\u0001\u0006g\u000e\fG.Y\u0005\u0003_1\u00121!\u00138u\u00031\u0011'o\\6fe\u000e{WO\u001c;!\u0003\u0015!x\u000e]5d+\u0005\u0019\u0004C\u0001\u001b:\u001b\u0005)$B\u0001\u001c8\u0003\u0011a\u0017M\\4\u000b\u0003a\nAA[1wC&\u0011!(\u000e\u0002\u0007'R\u0014\u0018N\\4\u0002\rQ|\u0007/[2!\u0003\ri7oZ\u000b\u0002}A\u00191fP!\n\u0005\u0001c#!B!se\u0006L\bCA\u0016C\u0013\t\u0019EF\u0001\u0003CsR,\u0017\u0001B7tO\u0002\na!\u001a=ji\u0016$W#A$\u0011\u0005!{U\"A%\u000b\u0005)[\u0015AB1u_6L7M\u0003\u0002M\u001b\u0006Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u00059;\u0014\u0001B;uS2L!\u0001U%\u0003\u001b\u0005#x.\\5d\u0005>|G.Z1o\u0003\u001d)\u00070\u001b;fI\u0002\nQa]3u+B$\"\u0001V,\u0011\u0005-*\u0016B\u0001,-\u0005\u0011)f.\u001b;\t\u000baS\u0001\u0019A-\u0002\u0011Q,7\u000f^%oM>\u0004\"A\u00172\u000e\u0003mS!a\u0007/\u000b\u0005us\u0016a\u00026va&$XM\u001d\u0006\u0003?\u0002\fQA[;oSRT\u0011!Y\u0001\u0004_J<\u0017BA2\\\u0005!!Vm\u001d;J]\u001a|\u0007F\u0001\u0006f!\tQf-\u0003\u0002h7\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u0011Q,\u0017M\u001d#po:$\u0012\u0001\u0016\u0015\u0003\u0017-\u0004\"A\u00177\n\u00055\\&!C!gi\u0016\u0014X)Y2i\u0003\u0005\"Xm\u001d;US\u0016\u00148\u000b^1uKJ+7\u000f^8sKR{'+\u001a9mS\u000e\fG/[8o)\t!\u0006\u000fC\u0003r\u0019\u0001\u0007!/\u0001\u0004rk>\u0014X/\u001c\t\u0003gjt!\u0001\u001e=\u0011\u0005UdS\"\u0001<\u000b\u0005]4\u0012A\u0002\u001fs_>$h(\u0003\u0002zY\u00051\u0001K]3eK\u001aL!AO>\u000b\u0005ed\u0003F\u0002\u0007~\u0003\u0017\ti\u0001E\u0002\u007f\u0003\u000fi\u0011a \u0006\u0005\u0003\u0003\t\u0019!\u0001\u0005qe>4\u0018\u000eZ3s\u0015\r\t)\u0001X\u0001\u0007a\u0006\u0014\u0018-\\:\n\u0007\u0005%qPA\u0006WC2,XmU8ve\u000e,\u0017aB:ue&twm\u001d\u0017\u0005\u0003\u001f\t\u0019\"\t\u0002\u0002\u0012\u0005\u0011!p[\u0011\u0003\u0003+\tQa\u001b:bMRDs\u0001DA\r\u0003C\t\u0019\u0003\u0005\u0003\u0002\u001c\u0005uQBAA\u0002\u0013\u0011\ty\"a\u0001\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\u0003oC6,\u0017EAA\u0013\u0003aYH-[:qY\u0006Lh*Y7f{:\nXo\u001c:v[vZ\b'`\u0001\u001bo\u0006LGOR8s\u0019><WI\u001c3PM\u001a\u001cX\r\u001e+p\u001b\u0006$8\r\u001b\u000b\b)\u0006-\u00121HA \u0011\u001d\ti#\u0004a\u0001\u0003_\t!AY\u0019\u0011\t\u0005E\u0012qG\u0007\u0003\u0003gQ1!!\u000e\u0015\u0003\u0019\u0019XM\u001d<fe&!\u0011\u0011HA\u001a\u0005-Y\u0015MZ6b\u0005J|7.\u001a:\t\u000f\u0005uR\u00021\u0001\u00020\u0005\u0011!M\r\u0005\u0007\u0003\u0003j\u0001\u0019\u0001\u0016\u0002\u0013A\f'\u000f^5uS>t\u0017AB4fi2{w\r\u0006\u0004\u0002H\u0005M\u0013q\u000b\t\u0005\u0003\u0013\ny%\u0004\u0002\u0002L)\u0019\u0011Q\n\u000b\u0002\u00071|w-\u0003\u0003\u0002R\u0005-#aC!cgR\u0014\u0018m\u0019;M_\u001eDq!!\u0016\u000f\u0001\u0004\ty#\u0001\u0004ce>\\WM\u001d\u0005\u0007\u0003\u0003r\u0001\u0019\u0001\u0016\u0002\u0015\u0015\u0004xn\u00195DC\u000eDW\r\u0006\u0003\u0002^\u0005%\u0004\u0003BA0\u0003Kj!!!\u0019\u000b\t\u0005\r\u00141G\u0001\u0006KB|7\r[\u0005\u0005\u0003O\n\tG\u0001\u000bMK\u0006$WM]#q_\u000eDg)\u001b7f\u0007\u0006\u001c\u0007.\u001a\u0005\b\u0003+z\u0001\u0019AA\u0018\u0003!\tw/Y5u\u0013N\u0013Fc\u0002+\u0002p\u0005\u0015\u0015\u0011\u0012\u0005\b\u0003c\u0002\u0002\u0019AA:\u0003\t!\b\u000f\u0005\u0003\u0002v\u0005\u0005UBAA<\u0015\u0011\tI(a\u001f\u0002\r\r|W.\\8o\u0015\r)\u0012Q\u0010\u0006\u0004\u0003\u007f\u0002\u0017AB1qC\u000eDW-\u0003\u0003\u0002\u0004\u0006]$A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\u001c\u0005\u0007\u0003\u000f\u0003\u0002\u0019\u0001\u0016\u0002\u00179,XNU3qY&\u001c\u0017m\u001d\u0005\b\u0003\u0017\u0003\u0002\u0019AA\u0018\u0003\u0019aW-\u00193fe\u0002")
public class TierEpochStateRevolvingReplicationTest
extends IntegrationTestHarness {
    private final int brokerCount;
    private final String topic;
    private final byte[] msg = new byte[1000];
    private final AtomicBoolean exited = new AtomicBoolean(false);

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    public String topic() {
        return this.topic;
    }

    public byte[] msg() {
        return this.msg;
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        Exit.setExitProcedure((x$1, x$2) -> this.exited().set(true));
        super.setUp(testInfo);
    }

    @Override
    @AfterEach
    public void tearDown() {
        Assertions.assertFalse((boolean)this.exited().get());
        super.tearDown();
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testTierStateRestoreToReplication(String quorum) {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "2000");
        properties.put("retention.bytes", "-1");
        properties.put("confluent.tier.local.hotset.bytes", "0");
        properties.put("min.insync.replicas", "2");
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        int leaderId = ((Tuple2)this.createTopic(this.topic(), 1, 3, properties, this.createTopic$default$5(), this.createTopic$default$6()).head())._2$mcI$sp();
        KafkaBroker leader = (KafkaBroker)this.brokers().find((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$1(leaderId, x$3))).get();
        this.awaitISR(tp, 3, leader);
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        this.brokers().foreach((Function1 & Serializable & scala.Serializable)b -> {
            TierTestUtils$.MODULE$.awaitTierTopicPartition(b, Predef$.MODULE$.int2Integer(0));
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_2 -> {
            int followerToShutdown = BoxesRunTime.unboxToInt((Object)((IterableLike)((TraversableLike)this.brokers().map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToInteger((int)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$4(x$4)), Buffer$.MODULE$.canBuildFrom())).filter((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)x$5 -> x$5 != leaderId)).head());
            this.killBroker(followerToShutdown);
            this.awaitISR(tp, 2, leader);
            producer.send(new ProducerRecord(tp.topic(), Predef$.MODULE$.int2Integer(tp.partition()), null, (Object)this.msg())).get();
            LogManager qual$1 = leader.replicaManager().logManager();
            boolean x$2 = qual$1.getLog$default$2();
            long logEndPriorToProduce = ((AbstractLog)qual$1.getLog(tp, x$2).get()).localLogEndOffset();
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 4).foreach((Function1 & Serializable & scala.Serializable)_ -> TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$6(this, producer, tp, BoxesRunTime.unboxToInt((Object)_)));
            long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l2 = 60000L;
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$7(leader, tp, logEndPriorToProduce)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$8());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            this.restartDeadBrokers(this.restartDeadBrokers$default$1());
            this.awaitISR(tp, 3, leader);
            producer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
            this.brokers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
                this.waitForLogEndOffsetToMatch(leader, broker, 0);
                return BoxedUnit.UNIT;
            });
            this.brokers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
                TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$10(this, leader, broker);
                return BoxedUnit.UNIT;
            });
        });
    }

    /*
     * WARNING - void declaration
     */
    private void waitForLogEndOffsetToMatch(KafkaBroker b1, KafkaBroker b2, int partition) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = 60000L;
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierEpochStateRevolvingReplicationTest.$anonfun$waitForLogEndOffsetToMatch$1(this, b1, partition, b2)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$waitForLogEndOffsetToMatch$2(this, b1, partition, b2));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private AbstractLog getLog(KafkaBroker broker, int partition) {
        LogManager qual$1 = broker.logManager();
        TopicPartition x$1 = new TopicPartition(this.topic(), partition);
        boolean x$2 = qual$1.getLog$default$2();
        return (AbstractLog)qual$1.getLog(x$1, x$2).orNull(Predef$.MODULE$.$conforms());
    }

    private LeaderEpochFileCache epochCache(KafkaBroker broker) {
        return (LeaderEpochFileCache)this.getLog(broker, 0).leaderEpochCache().get();
    }

    /*
     * WARNING - void declaration
     */
    private void awaitISR(TopicPartition tp, int numReplicas, KafkaBroker leader) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierEpochStateRevolvingReplicationTest.$anonfun$awaitISR$1(leader, tp, numReplicas)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$awaitISR$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$1(int leaderId$1, KafkaBroker x$3) {
        return x$3.config().brokerId() == leaderId$1;
    }

    public static final /* synthetic */ int $anonfun$testTierStateRestoreToReplication$4(KafkaBroker x$4) {
        return x$4.config().brokerId();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testTierStateRestoreToReplication$6(TierEpochStateRevolvingReplicationTest $this, KafkaProducer producer$1, TopicPartition tp$1, int _) {
        return (RecordMetadata)producer$1.send(new ProducerRecord(tp$1.topic(), Predef$.MODULE$.int2Integer(tp$1.partition()), null, (Object)$this.msg())).get();
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$7(KafkaBroker leader$1, TopicPartition tp$1, long logEndPriorToProduce$1) {
        boolean x$4;
        LogManager qual$2 = leader$1.replicaManager().logManager();
        return ((AbstractLog)qual$2.getLog(tp$1, x$4 = qual$2.getLog$default$2()).get()).localLogStartOffset() > logEndPriorToProduce$1;
    }

    public static final /* synthetic */ String $anonfun$testTierStateRestoreToReplication$8() {
        return "timed out waiting for segment tiering and deletion";
    }

    public static final /* synthetic */ void $anonfun$testTierStateRestoreToReplication$10(TierEpochStateRevolvingReplicationTest $this, KafkaBroker leader$1, KafkaBroker broker) {
        Assertions.assertEquals((Object)$this.epochCache(broker).epochEntries(), (Object)$this.epochCache(leader$1).epochEntries());
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogEndOffsetToMatch$1(TierEpochStateRevolvingReplicationTest $this, KafkaBroker b1$1, int partition$1, KafkaBroker b2$1) {
        return $this.getLog(b1$1, partition$1).logEndOffset() == $this.getLog(b2$1, partition$1).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitForLogEndOffsetToMatch$2(TierEpochStateRevolvingReplicationTest $this, KafkaBroker b1$1, int partition$1, KafkaBroker b2$1) {
        return new StringBuilder(27).append("Logs didn't match ").append($this.getLog(b1$1, partition$1).logEndOffset()).append(" vs ").append($this.getLog(b2$1, partition$1).logEndOffset()).append(". ").append(b1$1.config().brokerId()).append(" v ").append(b2$1.config().brokerId()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$2(int numReplicas$1, Partition x$6) {
        return x$6.inSyncReplicaIds().size() == numReplicas$1;
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$1(KafkaBroker leader$2, TopicPartition tp$2, int numReplicas$1) {
        return leader$2.replicaManager().onlinePartition(tp$2).exists((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)TierEpochStateRevolvingReplicationTest.$anonfun$awaitISR$2(numReplicas$1, x$6)));
    }

    public static final /* synthetic */ String $anonfun$awaitISR$3() {
        return "Timed out waiting for replicas to join ISR";
    }

    public TierEpochStateRevolvingReplicationTest() {
        this.brokerCount = 3;
        this.topic = "topic1";
        this.serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        this.serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), "5");
    }
}

