/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.availability.FilesWrapper;
import io.confluent.kafka.storage.checksum.Algorithm;
import io.confluent.kafka.storage.checksum.CheckedFileIO;
import java.io.File;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.AbstractLog;
import kafka.log.MergedLog$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.state.FileTierPartitionIterator;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.FileTierPartitionStateUploadObject;
import kafka.tier.state.TierPartitionState;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.FileTierPartitionStateRecoveryUploadMetadata;
import kafka.tier.store.objects.metadata.ObjectStoreMetadata;
import kafka.tier.store.objects.metadata.TierOffsetsRecoveryUploadMetadata;
import kafka.tier.store.objects.metadata.TierRecoveryUploadMetadata;
import kafka.tier.tools.TierRecoveryDataUploadCoordinator;
import kafka.tier.tools.TierRecoveryDataUploadJobStatus;
import kafka.tier.tools.TierRecoveryDataUploadResult;
import kafka.tier.tools.TierRecoveryUploadMetadataJson;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Seq$;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@Tag(value="integration")
@ScalaSignature(bytes="\u0006\u0005\tMb\u0001B\n\u0015\u0001mAQA\t\u0001\u0005\u0002\rBqA\n\u0001C\u0002\u0013\u0005q\u0005\u0003\u0004/\u0001\u0001\u0006I\u0001\u000b\u0005\b_\u0001\u0011\r\u0011\"\u00011\u0011\u0019I\u0004\u0001)A\u0005c!)!\b\u0001C!w!)\u0001\n\u0001C\u0001\u0013\")q\u000f\u0001C\u0005q\"9\u0011q\u0002\u0001\u0005\n\u0005E\u0001bBA\u0016\u0001\u0011%\u0011Q\u0006\u0005\b\u0003k\u0001A\u0011BA\u001c\u0011\u001d\tI\u0006\u0001C\u0005\u00037Bq!a\u001e\u0001\t\u0013\tI\bC\u0004\u0002\u0012\u0002!I!a%\t\u000f\u0005E\u0006\u0001\"\u0003\u00024\"9\u0011q\u0018\u0001\u0005\n\u0005\u0005\u0007bBAg\u0001\u0011%\u0011q\u001a\u0005\b\u0003s\u0004A\u0011BA~\u0005\u0015\"\u0016.\u001a:SK\u000e|g/\u001a:z\t\u0006$\u0018-\u00169m_\u0006$\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002\u0016-\u0005)Ao\\8mg*\u0011q\u0003G\u0001\u0005i&,'OC\u0001\u001a\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u000f\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0005}A\u0012aC5oi\u0016<'/\u0019;j_:L!!\t\u0010\u0003--\u000bgm[1TKJ4XM\u001d+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#\u0001\u0013\u0011\u0005\u0015\u0002Q\"\u0001\u000b\u0002\u00159,XN\u0011:pW\u0016\u00148/F\u0001)!\tIC&D\u0001+\u0015\u0005Y\u0013!B:dC2\f\u0017BA\u0017+\u0005\rIe\u000e^\u0001\f]Vl'I]8lKJ\u001c\b%A\bpm\u0016\u0014(/\u001b3j]\u001e\u0004&o\u001c9t+\u0005\t\u0004C\u0001\u001a8\u001b\u0005\u0019$B\u0001\u001b6\u0003\u0011)H/\u001b7\u000b\u0003Y\nAA[1wC&\u0011\u0001h\r\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\u0018\u0001E8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:!\u0003=9WM\\3sCR,7i\u001c8gS\u001e\u001cX#\u0001\u001f\u0011\u0007u\u0002%)D\u0001?\u0015\ty$&\u0001\u0006d_2dWm\u0019;j_:L!!\u0011 \u0003\u0007M+\u0017\u000f\u0005\u0002D\r6\tAI\u0003\u0002F1\u000511/\u001a:wKJL!a\u0012#\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u001bi\u0016\u001cH\u000fV5feJ+7m\u001c<fef$\u0015\r^1Va2|\u0017\r\u001a\u000b\u0003\u00156\u0003\"!K&\n\u00051S#\u0001B+oSRDQAT\u0004A\u0002=\u000ba!];peVl\u0007C\u0001)X\u001d\t\tV\u000b\u0005\u0002SU5\t1K\u0003\u0002U5\u00051AH]8pizJ!A\u0016\u0016\u0002\rA\u0013X\rZ3g\u0013\tA\u0016L\u0001\u0004TiJLgn\u001a\u0006\u0003-*BCaB.hQB\u0011A,Z\u0007\u0002;*\u0011alX\u0001\u0007a\u0006\u0014\u0018-\\:\u000b\u0005\u0001\f\u0017a\u00026va&$XM\u001d\u0006\u0003E\u000e\fQA[;oSRT\u0011\u0001Z\u0001\u0004_J<\u0017B\u00014^\u0005E\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^\u0001\u0005]\u0006lW-I\u0001j\u0003\tZH-[:qY\u0006Lh*Y7f{:Z\u0018M]4v[\u0016tGo],ji\"t\u0015-\\3t{\"\"qa[9s!\taw.D\u0001n\u0015\tqW,\u0001\u0005qe>4\u0018\u000eZ3s\u0013\t\u0001XNA\u0006WC2,XmU8ve\u000e,\u0017aB:ue&twm\u001d\u0017\u0003gV\f\u0013\u0001^\u0001\u0003u.\f\u0013A^\u0001\u0006WJ\fg\r^\u0001\u0013i&,'\u000fU1si&$\u0018n\u001c8Ti\u0006$X\r\u0006\u0003z\u007f\u0006\r\u0001C\u0001>~\u001b\u0005Y(B\u0001?\u0017\u0003\u0015\u0019H/\u0019;f\u0013\tq8P\u0001\nUS\u0016\u0014\b+\u0019:uSRLwN\\*uCR,\u0007BBA\u0001\u0011\u0001\u0007\u0001&\u0001\u0005ce>\\WM]%e\u0011\u001d\t)\u0001\u0003a\u0001\u0003\u000f\t\u0001\u0003^8qS\u000eLE\rU1si&$\u0018n\u001c8\u0011\t\u0005%\u00111B\u0007\u0002-%\u0019\u0011Q\u0002\f\u0003!Q{\u0007/[2JIB\u000b'\u000f^5uS>t\u0017AB4fi2{w\r\u0006\u0004\u0002\u0014\u0005\u0015\u0012q\u0005\t\u0006S\u0005U\u0011\u0011D\u0005\u0004\u0003/Q#AB(qi&|g\u000e\u0005\u0003\u0002\u001c\u0005\u0005RBAA\u000f\u0015\r\ty\u0002G\u0001\u0004Y><\u0017\u0002BA\u0012\u0003;\u00111\"\u00112tiJ\f7\r\u001e'pO\"1\u0011\u0011A\u0005A\u0002!Bq!!\u000b\n\u0001\u0004\t9!\u0001\u0002ua\u0006y\u0012\r\u001d9f]\u0012$v\u000eV8qS\u000e\fe\u000eZ,bSR,f\u000e^5m)&,'/\u001a3\u0015\u000b)\u000by#a\r\t\r\u0005E\"\u00021\u0001)\u0003-!x\u000e]5d\u0019\u0016\fG-\u001a:\t\u000f\u0005\u0015!\u00021\u0001\u0002\b\u0005qq-\u001a;US\u0016\u0014xJ\u001a4tKR\u001cHCAA\u001d!\u001d\u0011\u00141HA \u0003\u0017J1!!\u00104\u0005\u001dA\u0015m\u001d5NCB\u0004B!!\u0011\u0002H5\u0011\u00111\t\u0006\u0004\u0003\u000b*\u0014\u0001\u00027b]\u001eLA!!\u0013\u0002D\t9\u0011J\u001c;fO\u0016\u0014\bC\u0002\u001a\u0002<=\u000bi\u0005\u0005\u0003\u0002P\u0005USBAA)\u0015\r\t\u0019&N\u0001\u0004]&|\u0017\u0002BA,\u0003#\u0012!BQ=uK\n+hMZ3s\u0003q!\u0018.\u001a:QCJ$\u0018\u000e^5p]N#\u0018\r^3Va2|\u0017\r\u001a(b[\u0016$\"\"!\u0018\u0002`\u00055\u0014\u0011OA;!\u0011I\u0013QC(\t\u000f\u0005\u0005D\u00021\u0001\u0002d\u0005)1\u000f^8sKB!\u0011QMA5\u001b\t\t9GC\u0002\u0002bYIA!a\u001b\u0002h\tyA+[3s\u001f\nTWm\u0019;Ti>\u0014X\r\u0003\u0004\u0002p1\u0001\raT\u0001\u000bS\u0012,g\u000e^5gS\u0016\u0014\bbBA:\u0019\u0001\u0007\u0011qH\u0001\u0007EJ|7.\u001a:\t\u000f\u0005\u0015A\u00021\u0001\u0002\b\u0005ABm\\<oY>\fGMU3d_Z,'/_'fi\u0006$\u0017\r^1\u0015\u0015\u0005m\u0014\u0011QAB\u0003\u000b\u000b9\tE\u0002&\u0003{J1!a \u0015\u0005y!\u0016.\u001a:SK\u000e|g/\u001a:z+Bdw.\u00193NKR\fG-\u0019;b\u0015N|g\u000eC\u0004\u0002b5\u0001\r!a\u0019\t\r\u0005=T\u00021\u0001P\u0011\u001d\t\u0019(\u0004a\u0001\u0003\u007fAq!!#\u000e\u0001\u0004\tY)A\tu_BL7-\u00133QCJ$\u0018\u000e^5p]N\u0004RAMAG\u0003\u000fI1!a$4\u0005\u001dA\u0015m\u001d5TKR\f\u0001D^1mS\u0012\fG/\u001a*fG>4XM]=NKR\fG-\u0019;b)%Q\u0015QSAM\u0003S\u000by\u000bC\u0004\u0002\u0018:\u0001\r!a\u001f\u0002\u00195,G/\u00193bi\u0006T5o\u001c8\t\u000f\u0005me\u00021\u0001\u0002\u001e\u00061Bo\u001c9jG&#\u0007+\u0019:uSRLwN\u001c+p!\u0006$\b\u000eE\u0004\u0002 \u0006\u0015\u0016qA(\u000e\u0005\u0005\u0005&bAAR}\u00059Q.\u001e;bE2,\u0017\u0002BAT\u0003C\u00131!T1q\u0011\u001d\tYK\u0004a\u0001\u0003[\u000b\u0001\u0004^8qS\u000eLE\rU1si&$\u0018n\u001c8U_2+\u0017\rZ3s!!\ty*!*\u0002\b\u0005}\u0002bBA:\u001d\u0001\u0007\u0011qH\u0001\u0014I><h\u000e\\8bIRKWM](gMN,Go\u001d\u000b\u000b\u0003\u001b\n),a.\u0002<\u0006u\u0006bBA1\u001f\u0001\u0007\u00111\r\u0005\u0007\u0003s{\u0001\u0019A(\u0002\r1|w\rR5s\u0011\u0019\tyg\u0004a\u0001\u001f\"9\u00111O\bA\u0002\u0005}\u0012a\u0005<bY&$\u0017\r^3US\u0016\u0014xJ\u001a4tKR\u001cHc\u0002&\u0002D\u0006\u001d\u00171\u001a\u0005\b\u0003\u000b\u0004\u0002\u0019AA\u001d\u0003-!\u0018.\u001a:PM\u001a\u001cX\r^:\t\u000f\u0005%\u0007\u00031\u0001\u0002L\u0005\u0019R\u000f\u001d7pC\u0012,G\rV5fe>3gm]3ug\"9\u00111\u000f\tA\u0002\u0005}\u0012A\u00063po:dw.\u00193G)B\u001bV\u000b\u001d7pC\u0012\u0014En\u001c2\u0015\u0019\u0005E\u0017Q\\Ap\u0003g\f)0a>\u0011\t\u0005M\u0017\u0011\\\u0007\u0003\u0003+TA!a6\u0002R\u0005!a-\u001b7f\u0013\u0011\tY.!6\u0003\tA\u000bG\u000f\u001b\u0005\b\u0003C\n\u0002\u0019AA2\u0011\u001d\t\t/\u0005a\u0001\u0003G\fa\"\u001e9m_\u0006$W*\u001a;bI\u0006$\u0018\r\u0005\u0003\u0002f\u0006=XBAAt\u0015\u0011\tI/a;\u0002\u00115,G/\u00193bi\u0006TA!!<\u0002h\u00059qN\u00196fGR\u001c\u0018\u0002BAy\u0003O\u0014AFR5mKRKWM\u001d)beRLG/[8o'R\fG/\u001a*fG>4XM]=Va2|\u0017\rZ'fi\u0006$\u0017\r^1\t\r\u0005=\u0014\u00031\u0001P\u0011\u001d\t\u0019(\u0005a\u0001\u0003\u007fAq!!\u0002\u0012\u0001\u0004\t9!\u0001\u0012d_6\u0004\u0018M]3Va2|\u0017\rZ!oI\u001acWo\u001d5fI\u001a#\u0006kU#oiJLWm\u001d\u000b\b\u0015\u0006u(1\u0004B\u0010\u0011\u001d\tyP\u0005a\u0001\u0005\u0003\tqB\u001a;qg\u001aKG.Z\"iC:tW\r\u001c\t\u0005\u0005\u0007\u00119\"\u0004\u0002\u0003\u0006)!!q\u0001B\u0005\u0003!\u0019\u0007.Z2lgVl'\u0002\u0002B\u0006\u0005\u001b\tqa\u001d;pe\u0006<WMC\u0002\u001a\u0005\u001fQAA!\u0005\u0003\u0014\u0005I1m\u001c8gYV,g\u000e\u001e\u0006\u0003\u0005+\t!![8\n\t\te!Q\u0001\u0002\u000e\u0007\",7m[3e\r&dW-S(\t\u000f\tu!\u00031\u0001\u0003\u0002\u0005\tR\u000f\u001d7pC\u00124\u0015\u000e\\3DQ\u0006tg.\u001a7\t\u000f\u0005\u0015!\u00031\u0001\u0002\b!:\u0001Aa\t\u00030\tE\u0002\u0003\u0002B\u0013\u0005Wi!Aa\n\u000b\u0007\t%r,A\u0002ba&LAA!\f\u0003(\t\u0019A+Y4\u0002\u000bY\fG.^3\"\u0003}\u0001")
public class TierRecoveryDataUploadIntegrationTest
extends KafkaServerTestHarness {
    private final int numBrokers;
    private final Properties overridingProps = new Properties();

    public int numBrokers() {
        return this.numBrokers;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numBrokers(), this.zkConnectOrNull(), true, true, (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, false, false, (Map<Object, String>)((Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$)), 1, false, 1, (short)1, 0, false).map((Function1 & Serializable)x$1 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$1, fromProps_overrides, true);
        });
    }

    @ParameterizedTest(name="{displayName}.{argumentsWithNames}")
    @ValueSource(strings={"zk", "kraft"})
    public void testTierRecoveryDataUpload(String quorum) {
        int numTopics = 4;
        Properties props = new Properties();
        props.put("confluent.tier.enable", "true");
        props.put("segment.bytes", "2048");
        props.put("confluent.tier.local.hotset.ms", "1");
        String topicPrefix = "foo-";
        HashSet topicIdPartitions = new HashSet();
        scala.collection.mutable.HashMap topicIdPartitionToLeader = new scala.collection.mutable.HashMap();
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), numTopics).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
            String topic = new StringBuilder(0).append(topicPrefix).append(i).toString();
            int topicLeader = BoxesRunTime.unboxToInt((Object)this.createTopic(topic, 1, this.numBrokers(), props, this.createTopic$default$5(), this.createTopic$default$6()).apply((Object)BoxesRunTime.boxToInteger((int)0)));
            UUID topicId = CoreUtils$.MODULE$.toJavaUUID((Uuid)this.getTopicIds((Seq<String>)new .colon.colon((Object)topic, (List)Nil$.MODULE$)).apply((Object)topic));
            TopicIdPartition topicIdPartition = new TopicIdPartition(topic, topicId, 0);
            topicIdPartitions.add(topicIdPartition);
            topicIdPartitionToLeader.put((Object)topicIdPartition, (Object)Predef$.MODULE$.int2Integer(topicLeader));
            this.waitUntilLogCreatedOnBrokers(topicIdPartition.topicPartition());
            this.appendToTopicAndWaitUntilTiered(topicLeader, topicIdPartition);
        });
        HashMap<Integer, HashMap<String, ByteBuffer>> tierOffsets = this.getTierOffsets();
        Buffer<KafkaBroker> replicas = this.brokers();
        String identifier = new StringBuilder(5).append("rcca-").append(new Random().nextInt(10000)).toString();
        ObjectRef jobIds = ObjectRef.create((Object)((scala.collection.mutable.Seq)Seq$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$)));
        ObjectRef coordinators = ObjectRef.create((Object)((scala.collection.mutable.Seq)Seq$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$)));
        replicas.foreach((Function1 & Serializable)replica -> {
            TierRecoveryDataUploadIntegrationTest.$anonfun$testTierRecoveryDataUpload$2(topicIdPartitions, identifier, jobIds, coordinators, replica);
            return BoxedUnit.UNIT;
        });
        replicas.indices().foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
            KafkaBroker replica = (KafkaBroker)replicas.apply(i);
            int brokerId = replica.config().brokerId();
            TierRecoveryDataUploadCoordinator coordinator = (TierRecoveryDataUploadCoordinator)((scala.collection.mutable.Seq)coordinators$1.elem).apply(i);
            UUID jobId = (UUID)((scala.collection.mutable.Seq)jobIds$1.elem).apply(i);
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 15000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!TierRecoveryDataUploadIntegrationTest.$anonfun$testTierRecoveryDataUpload$4(coordinator, jobId)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)"Timed out waiting for job to complete");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            TierRecoveryDataUploadResult res = coordinator.getJobResult(jobId);
            Assertions.assertEquals(new HashMap(), (Object)res.failedPartitions());
            Assertions.assertTrue((boolean)res.metadataUploadCompleted());
            Assertions.assertTrue((boolean)res.tierOffsetsUploadCompleted());
            scala.collection.mutable.HashMap topicIdPartitionToPath = new scala.collection.mutable.HashMap();
            topicIdPartitions.forEach(topicIdPartition -> {
                TierObjectStore store = (TierObjectStore)replica.tierObjectStoreOpt().get();
                Option<String> uploadFullName = this.tierPartitionStateUploadName(store, identifier, Predef$.MODULE$.int2Integer(brokerId), (TopicIdPartition)topicIdPartition);
                Predef$.MODULE$.assert(uploadFullName.isDefined(), (Function0 & Serializable)() -> "Upload file not present at the object store");
                FileTierPartitionStateRecoveryUploadMetadata uploadMetadata = FileTierPartitionStateRecoveryUploadMetadata.fromPath((String)((String)uploadFullName.get()));
                topicIdPartitionToPath.put(topicIdPartition, uploadFullName.get());
                Path downloadedFtpsPath = this.downloadFTPSUploadBlob(store, uploadMetadata, identifier, Predef$.MODULE$.int2Integer(brokerId), (TopicIdPartition)topicIdPartition);
                File logDir = this.tierPartitionState(brokerId, (TopicIdPartition)topicIdPartition).dir();
                String basePath = MergedLog$.MODULE$.tierStateFile(logDir, 0L, "").toString();
                Algorithm algorithm = Algorithm.NO_CHECKSUM;
                if (replica.config().confluentConfig().tierChecksumFeatureEnabled()) {
                    algorithm = Algorithm.ADLER;
                }
                CheckedFileIO ftpsFileChannel = CheckedFileIO.open((Path)FileTierPartitionState.flushedFilePath((String)basePath, (Algorithm)algorithm), (OpenOption[])new OpenOption[]{StandardOpenOption.READ});
                CheckedFileIO uploadFileChannel = CheckedFileIO.open((Path)downloadedFtpsPath, (OpenOption[])new OpenOption[]{StandardOpenOption.READ});
                this.compareUploadAndFlushedFTPSEntries(ftpsFileChannel, uploadFileChannel, (TopicIdPartition)topicIdPartition);
            });
            TierRecoveryUploadMetadataJson metadataJson = this.downloadRecoveryMetadata((TierObjectStore)replica.tierObjectStoreOpt().get(), identifier, Predef$.MODULE$.int2Integer(brokerId), topicIdPartitions);
            this.validateRecoveryMetadata(metadataJson, (scala.collection.mutable.Map<TopicIdPartition, String>)topicIdPartitionToPath, (scala.collection.mutable.Map<TopicIdPartition, Integer>)topicIdPartitionToLeader, Predef$.MODULE$.int2Integer(brokerId));
            HashMap<String, ByteBuffer> uploadedTierOffsets = new HashMap<String, ByteBuffer>();
            replica.config().logDirs().foreach((Function1 & Serializable)logDir -> {
                ByteBuffer logDirTierOffsets = this.downloadTierOffsets((TierObjectStore)replica.tierObjectStoreOpt().get(), (String)logDir, identifier, Predef$.MODULE$.int2Integer(brokerId));
                return uploadedTierOffsets.put((String)logDir, logDirTierOffsets);
            });
            this.validateTierOffsets(tierOffsets, uploadedTierOffsets, Predef$.MODULE$.int2Integer(replica.config().brokerId()));
            replica.config().logDirs().foreach((Function1 & Serializable)logDir -> {
                topicIdPartitions.forEach(topicIdPartition -> FilesWrapper.newDirectoryStream((Path)Paths.get(logDir, new String[0]).resolve(topicIdPartition.topicPartition().toString())).forEach(path -> {
                    if (FileTierPartitionStateUploadObject.isRecoveryUploadFile((String)((Object)path).toString())) {
                        Assertions.fail((String)new StringBuilder(46).append("Local staged ftps upload file was not deleted ").append(((Object)path).toString()).toString());
                        return;
                    }
                }));
                return BoxedUnit.UNIT;
            });
        });
        TierRecoveryDataUploadCoordinator coordinator = (TierRecoveryDataUploadCoordinator)((scala.collection.mutable.Seq)coordinators.elem).head();
        UUID newJobId = coordinator.initiateTierRecoveryDataUpload(topicIdPartitions, "rcca-5678", 2);
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.RUNNING, (Object)coordinator.getJobResult(newJobId).status());
        replicas.indices().foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> {
            TierRecoveryDataUploadCoordinator coordinator = (TierRecoveryDataUploadCoordinator)((scala.collection.mutable.Seq)coordinators$1.elem).apply(i);
            UUID jobId = (UUID)((scala.collection.mutable.Seq)jobIds$1.elem).apply(i);
            Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.COMPLETED, (Object)coordinator.getJobResult(jobId).status());
            TierRecoveryDataUploadResult res = coordinator.getJobResult(jobId);
            Assertions.assertEquals(new HashMap(), (Object)res.failedPartitions());
            Assertions.assertTrue((boolean)res.metadataUploadCompleted());
            Assertions.assertTrue((boolean)res.tierOffsetsUploadCompleted());
        });
    }

    private TierPartitionState tierPartitionState(int brokerId, TopicIdPartition topicIdPartition) {
        return ((AbstractLog)this.getLog(brokerId, topicIdPartition).get()).tierPartitionState();
    }

    private Option<AbstractLog> getLog(int brokerId, TopicIdPartition tp) {
        return ((KafkaBroker)this.brokerForId(brokerId).get()).replicaManager().getLog(tp.topicPartition());
    }

    /*
     * WARNING - void declaration
     */
    private void appendToTopicAndWaitUntilTiered(int topicLeader, TopicIdPartition topicIdPartition) {
        int numMessages = 100;
        int n = 0;
        AbstractLog log = (AbstractLog)this.getLog(topicLeader, topicIdPartition).get();
        while (log.numberOfSegments() <= 3) {
            TestUtils$.MODULE$.generateAndProduceMessages(this.brokers().toSeq(), topicIdPartition.topic(), numMessages, -1);
            n += numMessages;
        }
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 60000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!(log.logEndOffset() == (long)n && log.tieredLogSegments().size() >= log.numberOfSegments() - 1)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Timeout waiting for all messages to be written and tiered");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
    }

    private HashMap<Integer, HashMap<String, ByteBuffer>> getTierOffsets() {
        HashMap<Integer, HashMap<String, ByteBuffer>> tierOffsets = new HashMap<Integer, HashMap<String, ByteBuffer>>();
        this.brokers().foreach((Function1 & Serializable)broker -> {
            TierRecoveryDataUploadIntegrationTest.$anonfun$getTierOffsets$1(broker);
            return BoxedUnit.UNIT;
        });
        this.brokers().foreach((Function1 & Serializable)broker -> {
            HashMap logDirTierOffsets = new HashMap();
            broker.config().logDirs().foreach((Function1 & Serializable)logDir -> {
                Path tierOffsetsPath = new File((String)logDir, "/tier.offsets").toPath();
                return logDirTierOffsets.put(logDir, ByteBuffer.wrap(Files.readAllBytes(tierOffsetsPath)));
            });
            return tierOffsets.put(Predef$.MODULE$.int2Integer(broker.config().brokerId()), logDirTierOffsets);
        });
        return tierOffsets;
    }

    private Option<String> tierPartitionStateUploadName(TierObjectStore store, String identifier, Integer broker, TopicIdPartition topicIdPartition) {
        String uploadPrefix = FileTierPartitionStateRecoveryUploadMetadata.pathPrefix((String)"", (String)identifier, (Integer)broker, (TopicIdPartition)topicIdPartition);
        None$ uploadName = None$.MODULE$;
        Iterator iter = store.listObject(uploadPrefix, false).keySet().iterator();
        if (iter.hasNext()) {
            uploadName = new Some(iter.next());
        }
        return uploadName;
    }

    private TierRecoveryUploadMetadataJson downloadRecoveryMetadata(TierObjectStore store, String identifier, Integer broker, HashSet<TopicIdPartition> topicIdPartitions) {
        TierRecoveryUploadMetadataJson tierRecoveryUploadMetadataJson;
        TierRecoveryUploadMetadata metadata = new TierRecoveryUploadMetadata(identifier, broker);
        try (InputStream stream = null;){
            try {
                stream = store.getObjectStoreFragment((ObjectStoreMetadata)metadata, FragmentType.TIER_RECOVERY_METADATA_UPLOAD).getInputStream();
                String resp = Source$.MODULE$.fromInputStream(stream, Codec$.MODULE$.fallbackSystemCodec()).mkString();
                TierRecoveryUploadMetadataJson metadataJson = (TierRecoveryUploadMetadataJson)new ObjectMapper().readValue(resp, TierRecoveryUploadMetadataJson.class);
                Assertions.assertEquals(topicIdPartitions.stream().map(tp -> tp.toString()).collect(Collectors.toSet()), metadataJson.partitions.keySet());
                Assertions.assertEquals((Integer)TierRecoveryDataUploadCoordinator.CURRENT_METADATA_VERSION, (Integer)metadataJson.version);
                tierRecoveryUploadMetadataJson = metadataJson;
            }
            catch (Exception exception) {
                tierRecoveryUploadMetadataJson = (TierRecoveryUploadMetadataJson)Assertions.fail((String)"Failed to upload recovery metadata file");
            }
        }
        return tierRecoveryUploadMetadataJson;
    }

    private void validateRecoveryMetadata(TierRecoveryUploadMetadataJson metadataJson, scala.collection.mutable.Map<TopicIdPartition, String> topicIdPartitionToPath, scala.collection.mutable.Map<TopicIdPartition, Integer> topicIdPartitionToLeader, Integer broker) {
        metadataJson.partitions.forEach((tp, partitionMetadata) -> {
            TopicIdPartition topicIdPartition = TopicIdPartition.fromString((String)tp);
            String path = (String)topicIdPartitionToPath.apply((Object)topicIdPartition);
            Integer leader = (Integer)topicIdPartitionToLeader.apply((Object)topicIdPartition);
            Assertions.assertEquals((Object)path, (Object)partitionMetadata.objectStorePath);
            Integer n = leader;
            Assertions.assertEquals((Object)BoxesRunTime.boxToBoolean((!(n != null ? !((Object)n).equals(broker) : broker != null) ? 1 : 0) != 0), (Object)BoxesRunTime.boxToBoolean((boolean)partitionMetadata.isLeader));
        });
    }

    private ByteBuffer downloadTierOffsets(TierObjectStore store, String logDir, String identifier, Integer broker) {
        ByteBuffer byteBuffer;
        TierOffsetsRecoveryUploadMetadata metadata = new TierOffsetsRecoveryUploadMetadata(logDir, identifier, broker);
        try (InputStream stream = null;){
            try {
                stream = store.getObjectStoreFragment((ObjectStoreMetadata)metadata, FragmentType.TIER_OFFSETS_UPLOAD).getInputStream();
                byteBuffer = ByteBuffer.wrap(IOUtils.toByteArray((InputStream)stream));
            }
            catch (Exception exception) {
                byteBuffer = (ByteBuffer)Assertions.fail((String)"Failed to upload tier offsets file");
            }
        }
        return byteBuffer;
    }

    private void validateTierOffsets(HashMap<Integer, HashMap<String, ByteBuffer>> tierOffsets, HashMap<String, ByteBuffer> uploadedTierOffsets, Integer broker) {
        Assertions.assertEquals(tierOffsets.get(broker), uploadedTierOffsets);
    }

    private Path downloadFTPSUploadBlob(TierObjectStore store, FileTierPartitionStateRecoveryUploadMetadata uploadMetadata, String identifier, Integer broker, TopicIdPartition topicIdPartition) {
        InputStream stream = null;
        File tempDir = TestUtils.tempDirectory(null, null);
        Path downloadedFileName = uploadMetadata.uploadObject().getRecoveryUploadPath(tempDir.getPath());
        try {
            try {
                stream = store.getObjectStoreFragment((ObjectStoreMetadata)uploadMetadata, FragmentType.FILE_TIER_PARTITION_STATE_UPLOAD).getInputStream();
                Files.copy(stream, downloadedFileName, StandardCopyOption.REPLACE_EXISTING);
            }
            catch (Exception exception) {
                Files.deleteIfExists(downloadedFileName);
            }
        }
        finally {
            if (stream != null) {
                stream.close();
            }
        }
        return downloadedFileName;
    }

    private void compareUploadAndFlushedFTPSEntries(CheckedFileIO ftpsFileChannel, CheckedFileIO uploadFileChannel, TopicIdPartition topicIdPartition) {
        Optional ftpsFileIteratorOpt = FileTierPartitionState.iterator((TopicPartition)topicIdPartition.topicPartition(), (CheckedFileIO)ftpsFileChannel);
        Predef$.MODULE$.assert(ftpsFileIteratorOpt.isPresent());
        FileTierPartitionIterator ftpsFileIterator = (FileTierPartitionIterator)ftpsFileIteratorOpt.get();
        Optional uploadFileIteratorOpt = FileTierPartitionState.iterator((TopicPartition)topicIdPartition.topicPartition(), (CheckedFileIO)uploadFileChannel);
        Predef$.MODULE$.assert(uploadFileIteratorOpt.isPresent());
        FileTierPartitionIterator uploadFileIterator = (FileTierPartitionIterator)uploadFileIteratorOpt.get();
        while (ftpsFileIterator.hasNext() && uploadFileIterator.hasNext()) {
            TierObjectMetadata ftpsFileObjectMetadata = (TierObjectMetadata)ftpsFileIterator.next();
            TierObjectMetadata uploadFileObjectMetadata = (TierObjectMetadata)uploadFileIterator.next();
            Predef$.MODULE$.assert(ftpsFileObjectMetadata.equals((Object)uploadFileObjectMetadata));
        }
        Predef$.MODULE$.assert(!ftpsFileIterator.hasNext() && !uploadFileIterator.hasNext());
    }

    public static final /* synthetic */ void $anonfun$testTierRecoveryDataUpload$2(HashSet topicIdPartitions$1, String identifier$1, ObjectRef jobIds$1, ObjectRef coordinators$1, KafkaBroker replica) {
        TierRecoveryDataUploadCoordinator coordinator = (TierRecoveryDataUploadCoordinator)replica.tierRecoveryDataUploadCoordinatorOpt().get();
        UUID jobId = coordinator.initiateTierRecoveryDataUpload((Set)topicIdPartitions$1, identifier$1, 2);
        jobIds$1.elem = (scala.collection.mutable.Seq)((scala.collection.mutable.Seq)jobIds$1.elem).$colon$plus((Object)jobId);
        coordinators$1.elem = (scala.collection.mutable.Seq)((scala.collection.mutable.Seq)coordinators$1.elem).$colon$plus((Object)coordinator);
        Assertions.assertEquals((Object)TierRecoveryDataUploadJobStatus.RUNNING, (Object)coordinator.getJobResult(jobId).status());
    }

    public static final /* synthetic */ boolean $anonfun$testTierRecoveryDataUpload$4(TierRecoveryDataUploadCoordinator coordinator$1, UUID jobId$1) {
        TierRecoveryDataUploadJobStatus tierRecoveryDataUploadJobStatus = coordinator$1.getJobResult(jobId$1).status();
        TierRecoveryDataUploadJobStatus tierRecoveryDataUploadJobStatus2 = TierRecoveryDataUploadJobStatus.COMPLETED;
        return !(tierRecoveryDataUploadJobStatus != null ? !tierRecoveryDataUploadJobStatus.equals(tierRecoveryDataUploadJobStatus2) : tierRecoveryDataUploadJobStatus2 != null);
    }

    public static final /* synthetic */ String $anonfun$testTierRecoveryDataUpload$5() {
        return "Timed out waiting for job to complete";
    }

    public static final /* synthetic */ boolean $anonfun$appendToTopicAndWaitUntilTiered$1(AbstractLog log$1, IntRef totalMessages$1) {
        return log$1.logEndOffset() == (long)totalMessages$1.elem && log$1.tieredLogSegments().size() >= log$1.numberOfSegments() - 1;
    }

    public static final /* synthetic */ String $anonfun$appendToTopicAndWaitUntilTiered$2() {
        return "Timeout waiting for all messages to be written and tiered";
    }

    public static final /* synthetic */ boolean $anonfun$getTierOffsets$3(String logDir) {
        Path tierOffsetsPath = new File(logDir, "/tier.offsets").toPath();
        return Files.exists(tierOffsetsPath, new LinkOption[0]) && Files.lines(tierOffsetsPath).count() > 1L;
    }

    public static final /* synthetic */ boolean $anonfun$getTierOffsets$2(KafkaBroker broker$1) {
        return broker$1.config().logDirs().forall((Function1 & Serializable)logDir -> BoxesRunTime.boxToBoolean((boolean)TierRecoveryDataUploadIntegrationTest.$anonfun$getTierOffsets$3(logDir)));
    }

    public static final /* synthetic */ String $anonfun$getTierOffsets$4() {
        return "timed out while waiting for tier.offsets to be flushed to disk in all logDirs";
    }

    public static final /* synthetic */ void $anonfun$getTierOffsets$1(KafkaBroker broker) {
        long waitUntilTrue_pause = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!TierRecoveryDataUploadIntegrationTest.$anonfun$getTierOffsets$2(broker)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"timed out while waiting for tier.offsets to be flushed to disk in all logDirs");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), waitUntilTrue_pause));
        }
    }

    public TierRecoveryDataUploadIntegrationTest() {
        this.numBrokers = 3;
        this.overridingProps().setProperty(KafkaConfig$.MODULE$.TierFeatureProp(), "true");
        this.overridingProps().setProperty(KafkaConfig$.MODULE$.TierBackendProp(), "mock");
        this.overridingProps().setProperty(KafkaConfig$.MODULE$.TierS3BucketProp(), "mybucket");
        this.overridingProps().setProperty(KafkaConfig$.MODULE$.TierLocalHotsetBytesProp(), "0");
        this.overridingProps().setProperty(KafkaConfig$.MODULE$.TierTopicDeleteCheckIntervalMsProp(), "5");
        this.overridingProps().setProperty("confluent.checksum.enabled.files", "tierstate");
    }
}

