/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.state;

import io.confluent.kafka.storage.checksum.Algorithm;
import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.Defaults$;
import kafka.tier.TierTestUtils$;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierPartitionForceRestore;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.state.ChecksumUtils;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.StateScan;
import kafka.tier.state.StateSeek;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.store.OpaqueData;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import scala.Function1;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005)3A\u0001B\u0003\u0001\u0019!)1\u0003\u0001C\u0001)!)q\u0003\u0001C\u00011!)\u0011\t\u0001C\u0001\u0005\n\tC+[3s!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;f\u0007>t7-\u001e:sK:\u001c\u0017\u0010V3ti*\u0011aaB\u0001\u0006gR\fG/\u001a\u0006\u0003\u0011%\tA\u0001^5fe*\t!\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001i\u0001C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002+A\u0011a\u0003A\u0007\u0002\u000b\u0005y\"/Z1e/JLG/\u001a%piN<\u0018\r]\"p]\u000e,(O]3oGf$Vm\u001d;\u0015\u0007ea\u0012\u0005\u0005\u0002\u000f5%\u00111d\u0004\u0002\u0005+:LG\u000fC\u0003\u001e\u0005\u0001\u0007a$A\bdQ\u0016\u001c7n];n\u000b:\f'\r\\3e!\tqq$\u0003\u0002!\u001f\t9!i\\8mK\u0006t\u0007\"\u0002\u0012\u0003\u0001\u0004q\u0012AD2mK\u0006tW\u000f]#oC\ndW\r\u001a\u0015\u0003\u0005\u0011\u0002\"!\n\u0018\u000e\u0003\u0019R!a\n\u0015\u0002\rA\f'/Y7t\u0015\tI#&A\u0004kkBLG/\u001a:\u000b\u0005-b\u0013!\u00026v]&$(\"A\u0017\u0002\u0007=\u0014x-\u0003\u00020M\t\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;)\t\t\tt\u0007\u000f\t\u0003eUj\u0011a\r\u0006\u0003i\u0019\n\u0001\u0002\u001d:pm&$WM]\u0005\u0003mM\u0012\u0011bQ:w'>,(oY3\u0002\u000bY\fG.^3-\teZThP\u0011\u0002u\u0005IAO];fYQ\u0014X/Z\u0011\u0002y\u0005Qa-\u00197tK2\"(/^3\"\u0003y\n!\u0002\u001e:vK22\u0017\r\\:fC\u0005\u0001\u0015a\u00034bYN,GFZ1mg\u0016\f\u0001D]3bI^\u0013\u0018\u000e^3D_:\u001cWO\u001d:f]\u000eLH+Z:u)\rI2)\u0012\u0005\u0006\t\u000e\u0001\rAH\u0001\u0017G\",7m[:v[\u0006sGmQ8na\u0006\u001cGO\u00127bO\")ai\u0001a\u0001=\u0005Y1\r\\3b]V\u0004h\t\\1hQ\t\u0019A\u0005\u000b\u0003\u0004c]JE\u0006B\u001d<{}\u0002")
public class TierPartitionStateConcurrencyTest {
    @ParameterizedTest
    @CsvSource(value={"true,true", "false,true", "true,false", "false,false"})
    public void readWriteHotswapConcurrencyTest(boolean checksumEnabled, boolean cleanupEnabled) {
        File baseDir = TestUtils.tempDirectory(null, null);
        String topic = UUID.randomUUID().toString();
        int partition = 0;
        UUID topicId = UUID.randomUUID();
        TopicIdPartition tpid = new TopicIdPartition(topic, topicId, partition);
        TopicPartition tp = tpid.topicPartition();
        int runLengthMs = 500;
        int nThreads = 8;
        int epoch = 0;
        MockTime time = new MockTime();
        long cleanupDelayMs = Defaults$.MODULE$.TierPartitionStateCleanupDelayMs();
        int cleanupIntervalMs = 0;
        FileTierPartitionState state = new FileTierPartitionState(baseDir, new LogDirFailureChannel(5), tp, true, (Scheduler)time.scheduler, checksumEnabled, false, (Time)time, new TierPartitionStateCleanupConfig(cleanupEnabled, cleanupDelayMs, (long)cleanupIntervalMs), false, -1);
        state.setTopicId(tpid.topicId());
        state.beginCatchup();
        state.onCatchUpComplete();
        long startTime = System.currentTimeMillis();
        AtomicLong readOffset = new AtomicLong(-1L);
        AtomicReference exception = new AtomicReference();
        AtomicBoolean shutdown = new AtomicBoolean(false);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> new Thread(new StateSeek(state, shutdown, exception, readOffset)).start());
        try {
            state.append((AbstractTierMetadata)new TierTopicInitLeader(tpid, epoch, UUID.randomUUID(), 0), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
            state.flush();
            int i2 = 0;
            while (System.currentTimeMillis() < startTime + (long)runLengthMs) {
                Path flushedPath = Paths.get(state.flushedPath(), new String[0]);
                byte[] copyBytes = Files.readAllBytes(flushedPath);
                Algorithm algorithm = ChecksumUtils.tierStateFileAlgorithm((Path)flushedPath);
                TierPartitionForceRestore originalRestore = new TierPartitionForceRestore(tpid, UUID.randomUUID(), Predef$.MODULE$.Long2long(state.startOffset().orElse(Predef$.MODULE$.long2Long(-1L))), state.endOffset(), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch(), "contenthash", Predef$.MODULE$.boolean2Boolean(false), Predef$.MODULE$.Byte2byte(algorithm.id));
                TierTestUtils$.MODULE$.uploadWithMetadata((TierPartitionState)state, tpid, epoch, UUID.randomUUID(), (long)(i2 * 2), (long)(i2 * 2 + 1), (long)i2, (long)i2, i2, false, true, false, new OffsetAndEpoch(0L, Optional.empty()), OpaqueData.ZEROED);
                state.flush();
                TierPartitionForceRestore revertedRestore = new TierPartitionForceRestore(tpid, UUID.randomUUID(), Predef$.MODULE$.Long2long(state.startOffset().orElse(Predef$.MODULE$.long2Long(-1L))), state.endOffset(), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch(), "contenthash", Predef$.MODULE$.boolean2Boolean(false), Predef$.MODULE$.Byte2byte(algorithm.id));
                byte[] correctBytes = Files.readAllBytes(Paths.get(state.flushedPath(), new String[0]));
                state.processRestoreEvents((AbstractTierMetadata)originalRestore, Optional.of(ByteBuffer.wrap(copyBytes)), TierPartitionStatus.ONLINE, TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
                state.processRestoreEvents((AbstractTierMetadata)revertedRestore, Optional.of(ByteBuffer.wrap(correctBytes)), TierPartitionStatus.ONLINE, TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
                readOffset.set((i2 - 1) * 2);
                ++i2;
            }
            shutdown.set(true);
            Thread.sleep(10L);
            if (exception.get() != null) {
                ((Throwable)exception.get()).printStackTrace();
            }
            Assertions.assertNull(exception.get());
        }
        finally {
            state.delete();
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true,true", "false,true", "true,false", "false,false"})
    public void readWriteConcurrencyTest(boolean checksumAndCompactFlag, boolean cleanupFlag) {
        File baseDir = TestUtils.tempDirectory(null, null);
        String topic = UUID.randomUUID().toString();
        int partition = 0;
        UUID topicId = UUID.randomUUID();
        TopicIdPartition tpid = new TopicIdPartition(topic, topicId, partition);
        TopicPartition tp = tpid.topicPartition();
        int runLengthMs = 500;
        int nThreads = 8;
        int epoch = 0;
        MockTime time = new MockTime();
        long cleanupDelayMs = Defaults$.MODULE$.TierPartitionStateCleanupDelayMs();
        int cleanupIntervalMs = 0;
        FileTierPartitionState state = new FileTierPartitionState(baseDir, new LogDirFailureChannel(5), tp, true, (Scheduler)time.scheduler, checksumAndCompactFlag, checksumAndCompactFlag, (Time)time, new TierPartitionStateCleanupConfig(cleanupFlag, cleanupDelayMs, (long)cleanupIntervalMs), false, -1);
        state.setTopicId(tpid.topicId());
        state.beginCatchup();
        state.onCatchUpComplete();
        long startTime = System.currentTimeMillis();
        AtomicLong latestStartOffset = new AtomicLong(0L);
        AtomicReference exception = new AtomicReference();
        AtomicBoolean shutdown = new AtomicBoolean(false);
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> new Thread(new StateSeek(state, shutdown, exception, latestStartOffset)).start());
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> new Thread(new StateScan(state, shutdown, exception, latestStartOffset)).start());
        try {
            state.append((AbstractTierMetadata)new TierTopicInitLeader(tpid, epoch, UUID.randomUUID(), 0), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
            int size = 0;
            int i2 = 0;
            while (System.currentTimeMillis() < startTime + (long)runLengthMs) {
                TierTestUtils$.MODULE$.uploadWithMetadata((TierPartitionState)state, tpid, epoch, UUID.randomUUID(), (long)(i2 * 2), (long)(i2 * 2 + 1), (long)i2, (long)i2, i2, false, true, false, OffsetAndEpoch.EMPTY, OpaqueData.ZEROED);
                state.flush();
                latestStartOffset.set(i2 * 2);
                size += i2;
                ++i2;
            }
            shutdown.set(true);
            Thread.sleep(10L);
            if (exception.get() != null) {
                ((Throwable)exception.get()).printStackTrace();
            }
            Assertions.assertNull(exception.get());
        }
        finally {
            state.delete();
        }
    }
}

