package kafka.tier.raft;

import io.confluent.kafka.concurrent.EventExecutor;
import io.confluent.kafka.raft.SimpleRaftTracer;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.VersionInformation;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.LoggingFaultHandler;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

/* loaded from: input_file:kafka/tier/raft/KRaftSnapshotManager.class */
public final class KRaftSnapshotManager implements SimpleRaftTracer {
    private final EventExecutor executor;
    private final TierObjectStore objectStore;
    private final Logger logger;
    private final Function<TopicIdPartition, Optional<Path>> topicIdPath;
    private final String clusterId;
    final Context context = new Context();
    private final FaultHandler faultHandler = new LoggingFaultHandler("raftSnapshotManager", () -> {
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/raft/KRaftSnapshotManager$Context.class */
    public static final class Context {
        int remoteListRetries = 0;
        Optional<SortedSet<KRaftSnapshotObject>> remoteObjects = Optional.empty();
        boolean uploadScheduled = false;
        int remotePutRetries = 0;
        final SortedSet<LocalSnapshotObject> localObjects = new TreeSet();

        Context() {
        }
    }

    private KRaftSnapshotManager(EventExecutor eventExecutor, TierObjectStore tierObjectStore, LogContext logContext, Function<TopicIdPartition, Optional<Path>> function, String str) {
        this.executor = eventExecutor;
        this.objectStore = tierObjectStore;
        this.logger = logContext.logger(getClass());
        this.topicIdPath = function;
        this.clusterId = str;
        scheduleListRemoteObjects();
    }

    private void scheduleListRemoteObjects() {
        Callable<Void> callable = () -> {
            try {
                Map<KRaftSnapshotObject, List<VersionInformation>> listObjects = KRaftSnapshotObjectUtils.listObjects(this.objectStore, false, "");
                this.context.remoteListRetries = 0;
                this.context.remoteObjects = Optional.of(new TreeSet(listObjects.keySet()));
                scheduleUploadLocalObjects();
                return null;
            } catch (TierObjectStoreRetriableException e) {
                this.context.remoteListRetries++;
                scheduleListRemoteObjects();
                return null;
            }
        };
        if (this.context.remoteListRetries == 0) {
            this.executor.submit(createEvent("ListRemoteObject", callable));
        } else {
            this.executor.schedule(createEvent("ListRemoteObject", callable), Math.min(10, this.context.remoteListRetries), TimeUnit.SECONDS);
        }
    }

    private Optional<LocalSnapshotObject> nextPendingLocal() {
        return this.context.remoteObjects.flatMap(sortedSet -> {
            this.context.localObjects.removeIf(localSnapshotObject -> {
                return sortedSet.contains(localSnapshotObject.snapshotObject());
            });
            return this.context.localObjects.isEmpty() ? Optional.empty() : Optional.of(this.context.localObjects.last());
        });
    }

    private void scheduleUploadLocalObjects() {
        if (this.context.uploadScheduled || !nextPendingLocal().isPresent()) {
            return;
        }
        Callable<Void> callable = () -> {
            this.context.uploadScheduled = false;
            try {
                Optional<LocalSnapshotObject> nextPendingLocal = nextPendingLocal();
                if (!nextPendingLocal.isPresent()) {
                    return null;
                }
                LocalSnapshotObject localSnapshotObject = nextPendingLocal.get();
                Optional<Path> apply = this.topicIdPath.apply(localSnapshotObject.topicIdPartition());
                if (!apply.isPresent()) {
                    throw new FileNotFoundException("logDir could not be found for object: " + nextPendingLocal);
                }
                KRaftSnapshotObjectUtils.putObject(this.objectStore, new TierObjectStore.KRaftSnapshotMetadata(localSnapshotObject.snapshotObject()), Snapshots.snapshotPath(apply.get(), localSnapshotObject.snapshotObject().snapshotId()).toFile());
                this.context.remotePutRetries = 0;
                this.context.remoteObjects.get().add(localSnapshotObject.snapshotObject());
                this.context.localObjects.remove(localSnapshotObject);
                scheduleUploadLocalObjects();
                return null;
            } catch (TierObjectStoreRetriableException e) {
                this.context.remotePutRetries++;
                scheduleUploadLocalObjects();
                return null;
            }
        };
        if (this.context.remotePutRetries == 0) {
            this.executor.submit(createEvent("PutRemoteObject", callable));
        } else {
            this.executor.schedule(createEvent("PutRemoteObject", callable), Math.min(10, this.context.remotePutRetries), TimeUnit.SECONDS);
        }
        this.context.uploadScheduled = true;
    }

    @Override // io.confluent.kafka.raft.SimpleRaftTracer, io.confluent.kafka.raft.RaftTracer
    public void nodeStartedUp(TopicIdPartition topicIdPartition, int i, OptionalInt optionalInt, OptionalLong optionalLong, long j, long j2, SortedSet<OffsetAndEpoch> sortedSet, OptionalInt optionalInt2, Set<Integer> set) {
        this.executor.submit(createEvent("ReplicaStarted", () -> {
            this.logger.info("replica for topic partition {} started with {}", topicIdPartition, sortedSet);
            Iterator it = sortedSet.iterator();
            while (it.hasNext()) {
                this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(topicIdPartition.topicId(), topicIdPartition.partition(), this.clusterId, optionalInt.orElse(-1), 0L, (OffsetAndEpoch) it.next()), topicIdPartition));
            }
            scheduleUploadLocalObjects();
            return null;
        }));
    }

    @Override // io.confluent.kafka.raft.SimpleRaftTracer, io.confluent.kafka.raft.RaftTracer
    public void snapshotGenerated(TopicIdPartition topicIdPartition, int i, OptionalInt optionalInt, OptionalLong optionalLong, long j, long j2, OffsetAndEpoch offsetAndEpoch, OptionalInt optionalInt2, Set<Integer> set) {
        this.executor.submit(createEvent("SnapshotGenerated", () -> {
            this.logger.info("topic partition {} generated a snapshot at {}", topicIdPartition, offsetAndEpoch);
            this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(topicIdPartition.topicId(), topicIdPartition.partition(), this.clusterId, optionalInt.orElse(-1), 0L, offsetAndEpoch), topicIdPartition));
            scheduleUploadLocalObjects();
            return null;
        }));
    }

    private Callable<Void> createEvent(String str, Callable<Void> callable) {
        return () -> {
            this.logger.info("handling: {}", str);
            try {
                return (Void) callable.call();
            } catch (Throwable th) {
                throw this.faultHandler.handleFault(str, th);
            }
        };
    }

    public static KRaftSnapshotManager create(EventExecutor eventExecutor, TierObjectStore tierObjectStore, LogContext logContext, Function<TopicIdPartition, Optional<Path>> function, String str) {
        return new KRaftSnapshotManager(eventExecutor, tierObjectStore, logContext, function, str);
    }
}
