/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier;

import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.api.IntegrationTestHarness;
import kafka.cluster.Partition;
import kafka.log.AbstractLog;
import kafka.log.LogManager;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.epoch.LeaderEpochFileCache;
import kafka.tier.TierTestUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.TraversableLike;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005c\u0001B\t\u0013\u0001]AQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001C\u0002\u0013E\u0013\u0006\u0003\u00041\u0001\u0001\u0006IA\u000b\u0005\bc\u0001\u0011\r\u0011\"\u00013\u0011\u0019Y\u0004\u0001)A\u0005g!9A\b\u0001b\u0001\n\u0003i\u0004B\u0002#\u0001A\u0003%a\bC\u0004F\u0001\t\u0007I\u0011\u0001$\t\rE\u0003\u0001\u0015!\u0003H\u0011\u0015\u0011\u0006\u0001\"\u0011T\u0011\u0015\u0019\u0007\u0001\"\u0011T\u0011\u0015A\u0007\u0001\"\u0001T\u0011\u0015i\u0007\u0001\"\u0003o\u0011\u0015Y\b\u0001\"\u0003}\u0011\u001d\ti\u0001\u0001C\u0005\u0003\u001fAq!a\b\u0001\t\u0013\t\tC\u0001\u0014US\u0016\u0014X\t]8dQN#\u0018\r^3SKZ|GN^5oOJ+\u0007\u000f\\5dCRLwN\u001c+fgRT!a\u0005\u000b\u0002\tQLWM\u001d\u0006\u0002+\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\u0019=A\u0011\u0011\u0004H\u0007\u00025)\u00111\u0004F\u0001\u0004CBL\u0017BA\u000f\u001b\u0005YIe\u000e^3he\u0006$\u0018n\u001c8UKN$\b*\u0019:oKN\u001c\bCA\u0010#\u001b\u0005\u0001#BA\u0011\u0015\u0003\u0015)H/\u001b7t\u0013\t\u0019\u0003EA\u0004M_\u001e<\u0017N\\4\u0002\rqJg.\u001b;?)\u00051\u0003CA\u0014\u0001\u001b\u0005\u0011\u0012a\u00032s_.,'oQ8v]R,\u0012A\u000b\t\u0003W9j\u0011\u0001\f\u0006\u0002[\u0005)1oY1mC&\u0011q\u0006\f\u0002\u0004\u0013:$\u0018\u0001\u00042s_.,'oQ8v]R\u0004\u0013!\u0002;pa&\u001cW#A\u001a\u0011\u0005QJT\"A\u001b\u000b\u0005Y:\u0014\u0001\u00027b]\u001eT\u0011\u0001O\u0001\u0005U\u00064\u0018-\u0003\u0002;k\t11\u000b\u001e:j]\u001e\fa\u0001^8qS\u000e\u0004\u0013aA7tOV\ta\bE\u0002,\u007f\u0005K!\u0001\u0011\u0017\u0003\u000b\u0005\u0013(/Y=\u0011\u0005-\u0012\u0015BA\"-\u0005\u0011\u0011\u0015\u0010^3\u0002\t5\u001cx\rI\u0001\u0007KbLG/\u001a3\u0016\u0003\u001d\u0003\"\u0001S(\u000e\u0003%S!AS&\u0002\r\u0005$x.\\5d\u0015\taU*\u0001\u0006d_:\u001cWO\u001d:f]RT!AT\u001c\u0002\tU$\u0018\u000e\\\u0005\u0003!&\u0013Q\"\u0011;p[&\u001c'i\\8mK\u0006t\u0017aB3ySR,G\rI\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002)B\u00111&V\u0005\u0003-2\u0012A!\u00168ji\"\u0012!\u0002\u0017\t\u00033\u0006l\u0011A\u0017\u0006\u00037mS!\u0001X/\u0002\u000f),\b/\u001b;fe*\u0011alX\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002A\u0006\u0019qN]4\n\u0005\tT&A\u0003\"fM>\u0014X-R1dQ\u0006AA/Z1s\t><h\u000e\u000b\u0002\fKB\u0011\u0011LZ\u0005\u0003Oj\u0013\u0011\"\u00114uKJ,\u0015m\u00195\u0002CQ,7\u000f\u001e+jKJ\u001cF/\u0019;f%\u0016\u001cHo\u001c:f)>\u0014V\r\u001d7jG\u0006$\u0018n\u001c8)\u00051Q\u0007CA-l\u0013\ta'L\u0001\u0003UKN$\u0018AG<bSR4uN\u001d'pO\u0016sGm\u00144gg\u0016$Hk\\'bi\u000eDG\u0003\u0002+pofDQ\u0001]\u0007A\u0002E\f!AY\u0019\u0011\u0005I,X\"A:\u000b\u0005Q$\u0012AB:feZ,'/\u0003\u0002wg\nY1*\u00194lCN+'O^3s\u0011\u0015AX\u00021\u0001r\u0003\t\u0011'\u0007C\u0003{\u001b\u0001\u0007!&A\u0005qCJ$\u0018\u000e^5p]\u00061q-\u001a;M_\u001e$R!`A\u0004\u0003\u0017\u00012A`A\u0002\u001b\u0005y(bAA\u0001)\u0005\u0019An\\4\n\u0007\u0005\u0015qPA\u0006BEN$(/Y2u\u0019><\u0007BBA\u0005\u001d\u0001\u0007\u0011/\u0001\u0004ce>\\WM\u001d\u0005\u0006u:\u0001\rAK\u0001\u000bKB|7\r[\"bG\",G\u0003BA\t\u0003;\u0001B!a\u0005\u0002\u001a5\u0011\u0011Q\u0003\u0006\u0004\u0003/\u0019\u0018!B3q_\u000eD\u0017\u0002BA\u000e\u0003+\u0011A\u0003T3bI\u0016\u0014X\t]8dQ\u001aKG.Z\"bG\",\u0007BBA\u0005\u001f\u0001\u0007\u0011/\u0001\u0005bo\u0006LG/S*S)\u001d!\u00161EA\u001d\u0003{Aq!!\n\u0011\u0001\u0004\t9#\u0001\u0002uaB!\u0011\u0011FA\u001b\u001b\t\tYC\u0003\u0003\u0002.\u0005=\u0012AB2p[6|gNC\u0002\u0016\u0003cQ1!a\r`\u0003\u0019\t\u0007/Y2iK&!\u0011qGA\u0016\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:Da!a\u000f\u0011\u0001\u0004Q\u0013a\u00038v[J+\u0007\u000f\\5dCNDa!a\u0010\u0011\u0001\u0004\t\u0018A\u00027fC\u0012,'\u000f")
public class TierEpochStateRevolvingReplicationTest
extends IntegrationTestHarness {
    private final int brokerCount;
    private final String topic;
    private final byte[] msg = new byte[1000];
    private final AtomicBoolean exited = new AtomicBoolean(false);

    @Override
    public int brokerCount() {
        return this.brokerCount;
    }

    public String topic() {
        return this.topic;
    }

    public byte[] msg() {
        return this.msg;
    }

    public AtomicBoolean exited() {
        return this.exited;
    }

    @Override
    @BeforeEach
    public void setUp() {
        Exit.setExitProcedure((x$1, x$2) -> this.exited().set(true));
        super.setUp();
    }

    @Override
    @AfterEach
    public void tearDown() {
        Assertions.assertFalse((boolean)this.exited().get());
        super.tearDown();
    }

    @Test
    public void testTierStateRestoreToReplication() {
        Properties properties = new Properties();
        properties.put("confluent.tier.enable", "true");
        properties.put("segment.bytes", "2000");
        properties.put("retention.bytes", "-1");
        properties.put("confluent.tier.local.hotset.bytes", "0");
        properties.put("min.insync.replicas", "2");
        TopicPartition tp = new TopicPartition(this.topic(), 0);
        int leaderId = ((Tuple2)this.createTopic(this.topic(), 1, 3, properties).head())._2$mcI$sp();
        KafkaServer leader = (KafkaServer)this.servers().find((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$1(leaderId, x$3))).get();
        this.awaitISR(tp, 3, leader);
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        this.servers().foreach((Function1 & Serializable & scala.Serializable)b -> {
            TierTestUtils$.MODULE$.awaitTierTopicPartition(b, Predef$.MODULE$.int2Integer(0));
            return BoxedUnit.UNIT;
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)_2 -> {
            int followerToShutdown = BoxesRunTime.unboxToInt((Object)((IterableLike)((TraversableLike)this.servers().map((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToInteger((int)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$4(x$4)), Buffer$.MODULE$.canBuildFrom())).filter((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)x$5 -> x$5 != leaderId)).head());
            this.killBroker(followerToShutdown);
            this.awaitISR(tp, 2, leader);
            producer.send(new ProducerRecord(tp.topic(), Predef$.MODULE$.int2Integer(tp.partition()), null, (Object)this.msg())).get();
            LogManager qual$1 = leader.replicaManager().logManager();
            boolean x$2 = qual$1.getLog$default$2();
            long logEndPriorToProduce = ((AbstractLog)qual$1.getLog(tp, x$2).get()).localLogEndOffset();
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 4).foreach((Function1 & Serializable & scala.Serializable)_ -> TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$6(this, producer, tp, BoxesRunTime.unboxToInt((Object)_)));
            long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
            long l2 = 60000L;
            if (TestUtils$.MODULE$ == null) {
                throw null;
            }
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$7(leader, tp, logEndPriorToProduce)) {
                void waitUntilTrue_pause;
                void waitUntilTrue_waitTimeMs;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$8());
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            this.restartDeadBrokers(this.restartDeadBrokers$default$1());
            this.awaitISR(tp, 3, leader);
            producer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), null, (Object)this.msg())).get();
            this.servers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
                this.waitForLogEndOffsetToMatch(leader, broker, 0);
                return BoxedUnit.UNIT;
            });
            this.servers().foreach((Function1 & Serializable & scala.Serializable)broker -> {
                TierEpochStateRevolvingReplicationTest.$anonfun$testTierStateRestoreToReplication$10(this, leader, broker);
                return BoxedUnit.UNIT;
            });
        });
    }

    /*
     * WARNING - void declaration
     */
    private void waitForLogEndOffsetToMatch(KafkaServer b1, KafkaServer b2, int partition) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = 60000L;
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierEpochStateRevolvingReplicationTest.$anonfun$waitForLogEndOffsetToMatch$1(this, b1, partition, b2)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$waitForLogEndOffsetToMatch$2(this, b1, partition, b2));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private AbstractLog getLog(KafkaServer broker, int partition) {
        LogManager qual$1 = broker.logManager();
        TopicPartition x$1 = new TopicPartition(this.topic(), partition);
        boolean x$2 = qual$1.getLog$default$2();
        return (AbstractLog)qual$1.getLog(x$1, x$2).orNull(Predef$.MODULE$.$conforms());
    }

    private LeaderEpochFileCache epochCache(KafkaServer broker) {
        return (LeaderEpochFileCache)this.getLog(broker, 0).leaderEpochCache().get();
    }

    /*
     * WARNING - void declaration
     */
    private void awaitISR(TopicPartition tp, int numReplicas, KafkaServer leader) {
        long l = TestUtils$.MODULE$.waitUntilTrue$default$4();
        long l2 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        if (TestUtils$.MODULE$ == null) {
            throw null;
        }
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierEpochStateRevolvingReplicationTest.$anonfun$awaitISR$1(leader, tp, numReplicas)) {
            void waitUntilTrue_pause;
            void waitUntilTrue_waitTimeMs;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)TierEpochStateRevolvingReplicationTest.$anonfun$awaitISR$2());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper((long)waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$1(int leaderId$1, KafkaServer x$3) {
        return x$3.config().brokerId() == leaderId$1;
    }

    public static final /* synthetic */ int $anonfun$testTierStateRestoreToReplication$4(KafkaServer x$4) {
        return x$4.config().brokerId();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testTierStateRestoreToReplication$6(TierEpochStateRevolvingReplicationTest $this, KafkaProducer producer$1, TopicPartition tp$1, int _) {
        return (RecordMetadata)producer$1.send(new ProducerRecord(tp$1.topic(), Predef$.MODULE$.int2Integer(tp$1.partition()), null, (Object)$this.msg())).get();
    }

    public static final /* synthetic */ boolean $anonfun$testTierStateRestoreToReplication$7(KafkaServer leader$1, TopicPartition tp$1, long logEndPriorToProduce$1) {
        boolean x$4;
        LogManager qual$2 = leader$1.replicaManager().logManager();
        return ((AbstractLog)qual$2.getLog(tp$1, x$4 = qual$2.getLog$default$2()).get()).localLogStartOffset() > logEndPriorToProduce$1;
    }

    public static final /* synthetic */ String $anonfun$testTierStateRestoreToReplication$8() {
        return "timed out waiting for segment tiering and deletion";
    }

    public static final /* synthetic */ void $anonfun$testTierStateRestoreToReplication$10(TierEpochStateRevolvingReplicationTest $this, KafkaServer leader$1, KafkaServer broker) {
        Assertions.assertEquals((Object)$this.epochCache(broker).epochEntries(), (Object)$this.epochCache(leader$1).epochEntries());
    }

    public static final /* synthetic */ boolean $anonfun$waitForLogEndOffsetToMatch$1(TierEpochStateRevolvingReplicationTest $this, KafkaServer b1$1, int partition$1, KafkaServer b2$1) {
        return $this.getLog(b1$1, partition$1).logEndOffset() == $this.getLog(b2$1, partition$1).logEndOffset();
    }

    public static final /* synthetic */ String $anonfun$waitForLogEndOffsetToMatch$2(TierEpochStateRevolvingReplicationTest $this, KafkaServer b1$1, int partition$1, KafkaServer b2$1) {
        return new StringBuilder(27).append("Logs didn't match ").append($this.getLog(b1$1, partition$1).logEndOffset()).append(" vs ").append($this.getLog(b2$1, partition$1).logEndOffset()).append(". ").append(b1$1.config().brokerId()).append(" v ").append(b2$1.config().brokerId()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$awaitISR$1(KafkaServer leader$2, TopicPartition tp$2, int numReplicas$1) {
        return ((Partition)leader$2.replicaManager().onlinePartition(tp$2).get()).inSyncReplicaIds().size() == numReplicas$1;
    }

    public static final /* synthetic */ String $anonfun$awaitISR$2() {
        return "Timed out waiting for replicas to join ISR";
    }

    public TierEpochStateRevolvingReplicationTest() {
        this.brokerCount = 3;
        this.topic = "topic1";
        this.serverConfig().put(KafkaConfig$.MODULE$.TierEnableProp(), "false");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierPartitionStateCommitIntervalProp(), "5");
        this.serverConfig().put(KafkaConfig$.MODULE$.TierMetadataNumPartitionsProp(), "1");
        this.serverConfig().put(KafkaConfig$.MODULE$.LogCleanupIntervalMsProp(), "5");
    }
}

