/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.tasks.compaction;

import com.typesafe.scalalogging.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import kafka.log.TierLogSegment;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierCompactionCommitAndSwap;
import kafka.tier.domain.TierSegmentUploadInitiate;
import kafka.tier.exceptions.NotTierablePartitionException;
import kafka.tier.exceptions.TierLogCompactionFatalException;
import kafka.tier.exceptions.TierLogCompactionFatalException$;
import kafka.tier.exceptions.TierLogCompactionFencedException;
import kafka.tier.exceptions.TierLogCompactionFencedException$;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.OffsetIndexFetchRequest;
import kafka.tier.fetcher.TierAbortedTxnReader;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.TierStateFetcher;
import kafka.tier.state.TierPartitionState;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.TierObjectStoreRetryPolicy;
import kafka.tier.store.objects.FragmentType;
import kafka.tier.store.objects.metadata.ObjectMetadata;
import kafka.tier.tasks.compaction.CompactionTask;
import kafka.tier.topic.TierTopicAppender;
import kafka.utils.Logging;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogSegment;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.java8.JFunction0;

public final class TierLogCompactionUtils$
implements Logging {
    public static TierLogCompactionUtils$ MODULE$;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    static {
        new TierLogCompactionUtils$();
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public String loggerName() {
        return CompactionTask.class.getName();
    }

    public boolean isCompactAndDelete(LogConfig logConfig) {
        return logConfig.compact() && logConfig.delete();
    }

    public int startingPosition(CancellationContext ctx, TierLogSegment segment, TierObjectStore tierObjectStore) {
        return BoxesRunTime.unboxToInt(this.maybeHandleIOException((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Error while computing starting position for segment ").append(segment).toString(), (Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> {
            int n;
            int indexPos = OffsetIndexFetchRequest.fetchOffsetPositionForStartingOffset((CancellationContext)ctx$2, (TierObjectStore)tierObjectStore$2, (ObjectMetadata)segment$1.metadata(), (long)segment$1.baseOffset()).startOffsetPosition.position;
            TierObjectStoreResponse response = tierObjectStore.getObjectStoreFragment(segment.metadata(), FragmentType.SEGMENT, Predef$.MODULE$.long2Long((long)indexPos));
            TierSegmentReader reader = new TierSegmentReader(new StringBuilder(37).append("CleanerFetch(segment=").append(segment).append(", targetOffset=").append(segment.baseOffset()).append(")").toString());
            try {
                n = indexPos + reader.positionForOffset(ctx, response.getInputStream(), segment.baseOffset(), Integer.MAX_VALUE);
            }
            finally {
                response.close();
            }
            return n;
        }));
    }

    public int startingPosition(TierLogSegment segment, TierObjectStore tierObjectStore, CancellationContext ctx, TierObjectStoreRetryPolicy retryPolicy) {
        return BoxesRunTime.unboxToInt(this.maybeRetry((Function0<TierObjectStoreRetryPolicy>)(Function0 & Serializable & scala.Serializable)() -> retryPolicy, ctx, (Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> MODULE$.startingPosition(ctx, segment, tierObjectStore)));
    }

    public void readSegment(TierLogSegment segment, TierObjectStore tierObjectStore, int startBytePosition, ByteBuffer readBuffer) {
        try (TierObjectStoreResponse response = tierObjectStore.getObjectStoreFragment(segment.metadata(), FragmentType.SEGMENT, Predef$.MODULE$.long2Long((long)startBytePosition), Predef$.MODULE$.long2Long((long)startBytePosition + (long)readBuffer.capacity()));){
            this.maybeHandleIOException((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(28).append("Error while reading segment ").append(segment).toString(), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> Utils.readFully((InputStream)response.getInputStream(), (ByteBuffer)readBuffer, (boolean)false));
            readBuffer.flip();
        }
    }

    public void readSegment(TierLogSegment segment, TierObjectStore tierObjectStore, int startBytePosition, ByteBuffer readBuffer, CancellationContext ctx, TierObjectStoreRetryPolicy retryPolicy) {
        this.maybeRetry((Function0<TierObjectStoreRetryPolicy>)(Function0 & Serializable & scala.Serializable)() -> retryPolicy, ctx, (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> MODULE$.readSegment(segment, tierObjectStore, startBytePosition, readBuffer));
    }

    public Option<ByteBuffer> fetchEpochState(TierObjectStore tierObjectStore, TierLogSegment finalSegment) {
        if (finalSegment.metadata().hasEpochState()) {
            Some some;
            try (TierObjectStoreResponse response = tierObjectStore.getObjectStoreFragment(finalSegment.metadata(), FragmentType.EPOCH_STATE);){
                some = new Some((Object)ByteBuffer.wrap((byte[])this.maybeHandleIOException((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Error while reading epoch state for segment ").append(finalSegment).toString(), (Function0 & Serializable & scala.Serializable)() -> Utils.readFullyToArray((InputStream)response.getInputStream(), (int)Predef$.MODULE$.Integer2int(TierStateFetcher.ESTIMATED_BUFFER_SIZE)))));
            }
            return some;
        }
        return None$.MODULE$;
    }

    public Option<ByteBuffer> fetchEpochState(TierObjectStore tierObjectStore, TierLogSegment finalSegment, CancellationContext ctx, TierObjectStoreRetryPolicy retryPolicy) {
        return (Option)this.maybeRetry((Function0<TierObjectStoreRetryPolicy>)(Function0 & Serializable & scala.Serializable)() -> retryPolicy, ctx, (Function0 & Serializable & scala.Serializable)() -> MODULE$.fetchEpochState(tierObjectStore, finalSegment));
    }

    public Option<ByteBuffer> fetchProducerState(TierObjectStore tierObjectStore, TierLogSegment finalSegment) {
        if (finalSegment.metadata().hasProducerState()) {
            Some some;
            try (TierObjectStoreResponse response = tierObjectStore.getObjectStoreFragment(finalSegment.metadata(), FragmentType.PRODUCER_STATE);){
                some = new Some((Object)ByteBuffer.wrap((byte[])this.maybeHandleIOException((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(47).append("Error while reading producer state for segment ").append(finalSegment).toString(), (Function0 & Serializable & scala.Serializable)() -> Utils.readFullyToArray((InputStream)response.getInputStream(), (int)Predef$.MODULE$.Integer2int(TierStateFetcher.ESTIMATED_BUFFER_SIZE)))));
            }
            return some;
        }
        return None$.MODULE$;
    }

    public Option<ByteBuffer> fetchProducerState(TierObjectStore tierObjectStore, TierLogSegment finalSegment, CancellationContext ctx, TierObjectStoreRetryPolicy retryPolicy) {
        return (Option)this.maybeRetry((Function0<TierObjectStoreRetryPolicy>)(Function0 & Serializable & scala.Serializable)() -> retryPolicy, ctx, (Function0 & Serializable & scala.Serializable)() -> MODULE$.fetchProducerState(tierObjectStore, finalSegment));
    }

    public List<AbortedTxn> fetchAbortedTransactions(TierObjectStore tierObjectStore, TierLogSegment segment, CancellationContext ctx) {
        if (segment.metadata().hasAbortedTxns()) {
            List list;
            try (TierObjectStoreResponse response = tierObjectStore.getObjectStoreFragment(segment.metadata(), FragmentType.TRANSACTION_INDEX);){
                list = (List)this.maybeHandleIOException((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Error while fetching aborted transactions for segment ").append(segment).toString(), (Function0 & Serializable & scala.Serializable)() -> ((TraversableOnce)CollectionConverters$.MODULE$.asScalaBufferConverter(TierAbortedTxnReader.readInto(ctx, response.getInputStream(), segment.baseOffset(), segment.endOffset())).asScala()).toList());
            }
            return list;
        }
        return List$.MODULE$.empty();
    }

    public List<AbortedTxn> fetchAbortedTransactions(TierObjectStore tierObjectStore, TierLogSegment segment, CancellationContext ctx, TierObjectStoreRetryPolicy retryPolicy) {
        return (List)this.maybeRetry((Function0<TierObjectStoreRetryPolicy>)(Function0 & Serializable & scala.Serializable)() -> retryPolicy, ctx, (Function0 & Serializable & scala.Serializable)() -> MODULE$.fetchAbortedTransactions(tierObjectStore, segment, ctx));
    }

    public void writeMarker(TierTopicAppender tierTopicAppender, AbstractTierMetadata marker) {
        TierPartitionState.AppendResult result;
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeMarker$1(marker)));
        }
        if (((Object)((Object)TierPartitionState.AppendResult.ACCEPTED)).equals((Object)(result = tierTopicAppender.addMetadata(marker).get()))) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeMarker$2(marker)));
                return;
            }
            return;
        }
        if (((Object)((Object)TierPartitionState.AppendResult.FENCED)).equals((Object)result)) {
            if (this.logger().underlying().isErrorEnabled()) {
                this.logger().underlying().error(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeMarker$3(marker)));
            }
            throw new TierLogCompactionFencedException(marker.topicIdPartition(), TierLogCompactionFencedException$.MODULE$.$lessinit$greater$default$2());
        }
        if (((Object)((Object)TierPartitionState.AppendResult.FAILED)).equals((Object)result)) {
            if (this.logger().underlying().isWarnEnabled()) {
                this.logger().underlying().warn(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeMarker$4(marker)));
            }
            throw new TierMetadataRetriableException(marker.topicIdPartition().toString());
        }
        if (((Object)((Object)TierPartitionState.AppendResult.NOT_TIERABLE)).equals((Object)result)) {
            throw new NotTierablePartitionException(marker.topicIdPartition());
        }
        if (this.logger().underlying().isErrorEnabled()) {
            this.logger().underlying().error(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeMarker$5(marker, result)));
        }
        throw new TierLogCompactionFatalException(new StringBuilder(31).append("Unexpected append result for ").append(marker.topicIdPartition()).append(": ").append((Object)result).toString(), TierLogCompactionFatalException$.MODULE$.$lessinit$greater$default$2());
    }

    public void writeUploadInitiateMarker(TierSegmentUploadInitiate uploadInitiate, TopicIdPartition topicIdPartition, Time time, TierTopicAppender tierTopicAppender) {
        long startTimeMs = time.milliseconds();
        this.writeMarker(tierTopicAppender, uploadInitiate);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeUploadInitiateMarker$1(uploadInitiate, topicIdPartition, time, startTimeMs)));
            return;
        }
    }

    public void writeCommitAndSwapMarker(TierCompactionCommitAndSwap commitAndSwap, TopicIdPartition topicIdPartition, Time time, TierTopicAppender tierTopicAppender) {
        long startTimeMs = time.milliseconds();
        this.writeMarker(tierTopicAppender, commitAndSwap);
        if (this.logger().underlying().isInfoEnabled()) {
            this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$writeCommitAndSwapMarker$1(commitAndSwap, topicIdPartition, time, startTimeMs)));
            return;
        }
    }

    public boolean shouldCompactNextSourceSegmentIntoDestSegment(TopicIdPartition topicIdPartition, LogSegment destSegment, TierLogSegment nextSegment, int targetSegmentSize, int minSegmentSize, int maxSegmentSize) {
        if (!destSegment.canConvertToRelativeOffset(nextSegment.endOffset())) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$shouldCompactNextSourceSegmentIntoDestSegment$1(topicIdPartition)));
            }
            return false;
        }
        if (destSegment.size() == 0) {
            return true;
        }
        if (destSegment.size() > maxSegmentSize - nextSegment.size()) {
            if (this.logger().underlying().isWarnEnabled()) {
                this.logger().underlying().warn(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$shouldCompactNextSourceSegmentIntoDestSegment$2(topicIdPartition, destSegment, nextSegment, maxSegmentSize)));
            }
            return false;
        }
        if (destSegment.size() >= minSegmentSize && destSegment.size() > targetSegmentSize - nextSegment.size()) {
            if (this.logger().underlying().isInfoEnabled()) {
                this.logger().underlying().info(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$shouldCompactNextSourceSegmentIntoDestSegment$3(topicIdPartition, destSegment, nextSegment, targetSegmentSize)));
            }
            return false;
        }
        return true;
    }

    public <T> T maybeRetry(Function0<TierObjectStoreRetryPolicy> retryPolicy, CancellationContext ctx, Function0<T> fun) {
        IntRef retryCount = IntRef.create((int)0);
        while (true) {
            if (ctx.isCancelled()) {
                throw new CancellationException("Cancellation context has been marked canceled, cancelling the operation.");
            }
            try {
                ++retryCount.elem;
                return (T)fun.apply();
            }
            catch (Exception e) {
                TierObjectStoreRetryPolicy.RetryInfo retryInfo = ((TierObjectStoreRetryPolicy)retryPolicy.apply()).shouldRetry(e, retryCount.elem);
                if (!retryInfo.shouldRetry()) {
                    if (this.logger().underlying().isErrorEnabled()) {
                        this.logger().underlying().error(this.msgWithLogIdent(TierLogCompactionUtils$.$anonfun$maybeRetry$1(retryCount)), (Throwable)e);
                    }
                    throw e;
                }
                Thread.sleep(retryInfo.getRetryInterval());
                continue;
            }
            break;
        }
    }

    private <T> T maybeHandleIOException(Function0<String> errorMsg, Function0<T> fun) {
        try {
            return (T)fun.apply();
        }
        catch (IOException e) {
            throw new TierObjectStoreRetriableException((String)errorMsg.apply(), e);
        }
        catch (UncheckedIOException e) {
            throw new TierObjectStoreRetriableException((String)errorMsg.apply(), e);
        }
    }

    public static final /* synthetic */ String $anonfun$writeMarker$1(AbstractTierMetadata marker$1) {
        return new StringBuilder(14).append("Emitting ").append(marker$1).append(" for ").append(marker$1.topicIdPartition()).toString();
    }

    public static final /* synthetic */ String $anonfun$writeMarker$2(AbstractTierMetadata marker$1) {
        return new StringBuilder(15).append("Finalized ").append(marker$1).append(" for ").append(marker$1.topicIdPartition()).toString();
    }

    public static final /* synthetic */ String $anonfun$writeMarker$3(AbstractTierMetadata marker$1) {
        return new StringBuilder(63).append("Stopping state machine for ").append(marker$1.topicIdPartition()).append(" as attempt to transition was fenced").toString();
    }

    public static final /* synthetic */ String $anonfun$writeMarker$4(AbstractTierMetadata marker$1) {
        return new StringBuilder(72).append("Backing off compaction process for ").append(marker$1.topicIdPartition()).append(" as attempt to transition was failed.").toString();
    }

    public static final /* synthetic */ String $anonfun$writeMarker$5(AbstractTierMetadata marker$1, TierPartitionState.AppendResult result$2) {
        return new StringBuilder(31).append("Unexpected append result for ").append(marker$1.topicIdPartition()).append(": ").append((Object)result$2).toString();
    }

    public static final /* synthetic */ String $anonfun$writeUploadInitiateMarker$1(TierSegmentUploadInitiate uploadInitiate$1, TopicIdPartition topicIdPartition$3, Time time$3, long startTimeMs$1) {
        return new StringBuilder(40).append(uploadInitiate$1).append(" for ").append(topicIdPartition$3).append(" was successfully written in in ").append(time$3.milliseconds() - startTimeMs$1).append(" ms").toString();
    }

    public static final /* synthetic */ String $anonfun$writeCommitAndSwapMarker$1(TierCompactionCommitAndSwap commitAndSwap$1, TopicIdPartition topicIdPartition$4, Time time$4, long startTimeMs$2) {
        return new StringBuilder(40).append(commitAndSwap$1).append(" for ").append(topicIdPartition$4).append(" was successfully written in in ").append(time$4.milliseconds() - startTimeMs$2).append(" ms").toString();
    }

    public static final /* synthetic */ String $anonfun$shouldCompactNextSourceSegmentIntoDestSegment$1(TopicIdPartition topicIdPartition$5) {
        return new StringBuilder(121).append(topicIdPartition$5).append(": next source segment may have offsets that are too large for the current dest segment's ").append("base offset, completing segment.").toString();
    }

    public static final /* synthetic */ String $anonfun$shouldCompactNextSourceSegmentIntoDestSegment$2(TopicIdPartition topicIdPartition$5, LogSegment destSegment$1, TierLogSegment nextSegment$1, int maxSegmentSize$1) {
        return new StringBuilder(123).append(topicIdPartition$5).append(": combined size of dest segment(").append(destSegment$1.size()).append(") and next source segment(").append(nextSegment$1).append(")").append(" exceeds max compacted segment size limit(").append(maxSegmentSize$1).append("), completing segment.").toString();
    }

    public static final /* synthetic */ String $anonfun$shouldCompactNextSourceSegmentIntoDestSegment$3(TopicIdPartition topicIdPartition$5, LogSegment destSegment$1, TierLogSegment nextSegment$1, int targetSegmentSize$1) {
        return new StringBuilder(116).append(topicIdPartition$5).append(": combined size of dest segment(").append(destSegment$1.size()).append(") and next source segment(").append(nextSegment$1).append(")").append(" exceeds target segment size limit(").append(targetSegmentSize$1).append("), completing segment.").toString();
    }

    public static final /* synthetic */ String $anonfun$maybeRetry$1(IntRef retryCount$1) {
        return new StringBuilder(30).append("Fail operation after ").append(retryCount$1.elem).append(" retries.").toString();
    }

    private TierLogCompactionUtils$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}

