/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.raft;

import io.confluent.kafka.concurrent.EventExecutor;
import io.confluent.kafka.raft.SimpleRaftTracer;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import kafka.server.KafkaRaftServer;
import kafka.tier.exceptions.E2EChecksumInvalidException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.raft.KRaftSnapshotMetrics;
import kafka.tier.raft.KRaftSnapshotObject;
import kafka.tier.raft.KRaftSnapshotObjectUtils;
import kafka.tier.raft.LocalSnapshotObject;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.VersionInformation;
import kafka.tier.store.objects.metadata.KRaftSnapshotMetadata;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.LoggingFaultHandler;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

public final class KRaftSnapshotManager
implements SimpleRaftTracer {
    private final EventExecutor executor;
    private final TierObjectStore objectStore;
    private final KRaftSnapshotMetrics metrics;
    private final Logger logger;
    private final Function<TopicIdPartition, Optional<Path>> topicIdPath;
    private final FaultHandler faultHandler;
    private final String clusterId;
    private final int nodeId;
    private final Supplier<Boolean> deleteEnable;
    private final Supplier<Long> retentionMs;
    private final Time time;
    public static final String KEY_PREFIX = "";
    private static final long MAX_REMOTE_RETRY_SECONDS = TimeUnit.MINUTES.toSeconds(1L);
    final Context context = new Context();

    private KRaftSnapshotManager(EventExecutor executor, TierObjectStore objectStore, KRaftSnapshotMetrics metrics, LogContext logContext, Function<TopicIdPartition, Optional<Path>> topicIdPath, String clusterId, int nodeId, Supplier<Boolean> deleteEnable, Supplier<Long> retentionMs, Time time) {
        this.executor = executor;
        this.objectStore = objectStore;
        this.metrics = metrics;
        this.logger = logContext.logger(this.getClass());
        this.topicIdPath = topicIdPath;
        this.faultHandler = new LoggingFaultHandler("raftSnapshotManager", () -> {});
        this.clusterId = clusterId;
        this.nodeId = nodeId;
        this.deleteEnable = deleteEnable;
        this.retentionMs = retentionMs;
        this.time = time;
        this.scheduleListRemoteObjects();
    }

    private void scheduleListRemoteObjects() {
        Callable<Void> callable = () -> {
            try {
                Map<KRaftSnapshotObject, List<VersionInformation>> objects = KRaftSnapshotObjectUtils.listObjectsByNode(this.objectStore, false, KEY_PREFIX, KafkaRaftServer.MetadataTopicId(), KafkaRaftServer.MetadataPartition().partition(), this.clusterId, this.nodeId);
                this.context.remoteListRetries = 0;
                this.context.remoteObjects = Optional.of(new TreeSet<KRaftSnapshotObject>(objects.keySet()));
                if (this.deleteEnable.get().booleanValue()) {
                    this.scheduleDeleteRemoteObjects();
                }
                this.scheduleUploadLocalObjects();
            }
            catch (TierObjectStoreRetriableException e) {
                ++this.context.remoteListRetries;
                this.logger.warn("ListRemoteObject for snapshots failed " + this.context.remoteListRetries + " times", (Throwable)((Object)e));
                this.scheduleListRemoteObjects();
            }
            return null;
        };
        if (this.context.remoteListRetries == 0) {
            this.executor.submit(this.createEvent("ListRemoteObject", callable));
        } else {
            this.executor.schedule(this.createEvent("ListRemoteObject", callable), (long)Math.min(10, this.context.remoteListRetries), TimeUnit.SECONDS);
        }
    }

    private Optional<LocalSnapshotObject> nextPendingLocal() {
        return this.context.remoteObjects.flatMap(remoteObjects -> {
            this.context.localObjects.removeIf(object -> remoteObjects.contains(object.snapshotObject()));
            if (this.context.localObjects.isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(this.context.localObjects.last());
        });
    }

    private void scheduleUploadLocalObjects() {
        if (!this.context.uploadScheduled && this.nextPendingLocal().isPresent()) {
            Callable<Void> callable = () -> {
                this.context.uploadScheduled = false;
                LocalSnapshotObject localObject = null;
                try {
                    Optional<LocalSnapshotObject> snapshotObject = this.nextPendingLocal();
                    if (!snapshotObject.isPresent()) {
                        return null;
                    }
                    localObject = snapshotObject.get();
                    Optional<Path> logDir = this.topicIdPath.apply(localObject.topicIdPartition());
                    if (!logDir.isPresent()) {
                        throw new FileNotFoundException("logDir could not be found for object: " + snapshotObject);
                    }
                    Path snapshotPath = Snapshots.snapshotPath((Path)logDir.get(), (OffsetAndEpoch)localObject.snapshotObject().snapshotId());
                    KRaftSnapshotMetadata snapshotMetadata = new KRaftSnapshotMetadata(localObject.snapshotObject());
                    this.ensureSnapshotNotCorrupted(logDir.get(), localObject.snapshotObject().snapshotId());
                    KRaftSnapshotManager.recordDuration(this.metrics.putLatencySensor(), () -> {
                        try {
                            KRaftSnapshotObjectUtils.putObject(this.objectStore, snapshotMetadata, snapshotPath.toFile());
                            return null;
                        }
                        catch (IOException e) {
                            throw this.faultHandler.handleFault("PutRemoteObject", (Throwable)e);
                        }
                    }, this.time);
                    this.metrics.updateLastUploadMs(this.time.milliseconds());
                    this.context.remotePutRetries = 0;
                    this.context.remoteObjects.get().add(localObject.snapshotObject());
                    this.context.localObjects.remove(localObject);
                    this.logger.info("PutRemoteObject success for snapshot: " + snapshotMetadata);
                    if (this.deleteEnable.get().booleanValue()) {
                        this.scheduleDeleteRemoteObject(snapshotMetadata);
                    }
                    this.scheduleUploadLocalObjects();
                }
                catch (CorruptRecordException e) {
                    this.logger.error("Detected on disk corruption of local KRaft snapshot file, skipping upload.", (Throwable)e);
                    this.metrics.incrementOnDiskCorruption();
                    if (Objects.nonNull(localObject)) {
                        this.context.localObjects.remove(localObject);
                    }
                }
                catch (E2EChecksumInvalidException | TierObjectStoreRetriableException e) {
                    ++this.context.remotePutRetries;
                    this.logger.warn("PutRemoteObject for snapshots failed " + this.context.remotePutRetries + " times", (Throwable)e);
                    if (e instanceof E2EChecksumInvalidException) {
                        this.metrics.incrementOnNetworkCorruption();
                    }
                    this.scheduleUploadLocalObjects();
                }
                return null;
            };
            if (this.context.remotePutRetries == 0) {
                this.executor.submit(this.createEvent("PutRemoteObject", callable));
            } else {
                this.executor.schedule(this.createEvent("PutRemoteObject", callable), (long)Math.min(10, this.context.remotePutRetries), TimeUnit.SECONDS);
            }
            this.context.uploadScheduled = true;
        }
    }

    private void ensureSnapshotNotCorrupted(Path logDir, OffsetAndEpoch snapshotId) {
        try (FileRawSnapshotReader reader = FileRawSnapshotReader.open((Path)logDir, (OffsetAndEpoch)snapshotId);){
            RecordsIterator recordsIterator = new RecordsIterator(reader.records(), (RecordSerde)MetadataRecordSerde.INSTANCE, BufferSupplier.create(), 0x800000, true);
            while (recordsIterator.hasNext()) {
                recordsIterator.next();
            }
        }
    }

    private void scheduleDeleteRemoteObject(KRaftSnapshotMetadata snapshotMetadata) {
        Callable<Void> callable = () -> {
            try {
                KRaftSnapshotManager.recordDuration(this.metrics.deleteLatencySensor(), () -> {
                    KRaftSnapshotObjectUtils.deleteObject(this.objectStore, snapshotMetadata, KEY_PREFIX);
                    return null;
                }, this.time);
                this.context.remoteObjects.get().remove(snapshotMetadata.snapshotObject());
                this.context.remoteDeleteRetries = 0;
                this.logger.info("DeleteRemoteObject success for snapshot: " + snapshotMetadata);
            }
            catch (TierObjectStoreRetriableException e) {
                ++this.context.remoteDeleteRetries;
                this.logger.warn("DeleteRemoteObject for snapshots failed " + this.context.remoteDeleteRetries + " times", (Throwable)((Object)e));
            }
            return null;
        };
        long timeRemainingToDeleteSeconds = Math.max(0L, TimeUnit.MILLISECONDS.toSeconds(snapshotMetadata.snapshotObject().appendTimeStampMs() + this.retentionMs.get() - this.time.milliseconds()));
        long retryDelaySeconds = Math.min(MAX_REMOTE_RETRY_SECONDS, (long)this.context.remoteDeleteRetries);
        this.executor.schedule(this.createEvent("DeleteRemoteObject", callable), timeRemainingToDeleteSeconds + retryDelaySeconds, TimeUnit.SECONDS);
    }

    private void scheduleDeleteRemoteObjects() {
        this.context.remoteObjects.get().forEach(snapshotObject -> this.scheduleDeleteRemoteObject(new KRaftSnapshotMetadata((KRaftSnapshotObject)snapshotObject)));
    }

    public void nodeStartedUp(TopicIdPartition tpId, int epoch, OptionalInt nodeId, OptionalLong hwm, long logStartOffset, long logEndOffset, SortedSet<OffsetAndEpoch> snapshotIds, OptionalInt leaderId, Set<Integer> currentVoters) {
        this.executor.submit(this.createEvent("ReplicaStarted", () -> {
            this.logger.info("replica for topic partition {} started with {}", (Object)tpId, (Object)snapshotIds);
            for (OffsetAndEpoch snapshot : snapshotIds) {
                this.maybeReadMaxSnapshotTimestamp(this.topicIdPath, tpId, snapshot).map(ts -> this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(tpId.topicId(), tpId.partition(), this.clusterId, nodeId.orElse(-1), (long)ts, snapshot), tpId)));
            }
            this.scheduleUploadLocalObjects();
            return null;
        }));
    }

    public void snapshotGenerated(TopicIdPartition tpId, int epoch, OptionalInt nodeId, OptionalLong hwm, long logStartOffset, long logEndOffset, OffsetAndEpoch newSnapshotId, OptionalInt leaderId, Set<Integer> currentVoters) {
        this.executor.submit(this.createEvent("SnapshotGenerated", () -> {
            this.logger.info("topic partition {} generated a snapshot at {}", (Object)tpId, (Object)newSnapshotId);
            this.maybeReadMaxSnapshotTimestamp(this.topicIdPath, tpId, newSnapshotId).map(ts -> this.context.localObjects.add(new LocalSnapshotObject(new KRaftSnapshotObject(tpId.topicId(), tpId.partition(), this.clusterId, nodeId.orElse(-1), (long)ts, newSnapshotId), tpId)));
            this.scheduleUploadLocalObjects();
            return null;
        }));
    }

    Optional<Long> maybeReadMaxSnapshotTimestamp(Function<TopicIdPartition, Optional<Path>> topicIdPath, TopicIdPartition tpId, OffsetAndEpoch snapshotId) {
        try {
            return topicIdPath.apply(tpId).map(path -> Snapshots.lastContainedLogTimestamp((Path)path, (OffsetAndEpoch)snapshotId));
        }
        catch (Exception e) {
            this.logger.error("Could not read snapshot: " + snapshotId, (Throwable)e);
            this.metrics.updateError(true);
            return Optional.empty();
        }
    }

    private Callable<Void> createEvent(String message, Callable<Void> callable) {
        return () -> {
            this.logger.debug("handling: {}", (Object)message);
            try {
                return (Void)callable.call();
            }
            catch (Throwable e) {
                throw this.faultHandler.handleFault(message, e);
            }
        };
    }

    static <T> T recordDuration(Sensor sensor, Supplier<T> supplier, Time time) {
        long startTimeMs = time.milliseconds();
        T result = supplier.get();
        long endTimeMs = time.milliseconds();
        sensor.record((double)(endTimeMs - startTimeMs));
        return result;
    }

    public static KRaftSnapshotManager create(EventExecutor executor, TierObjectStore objectStore, KRaftSnapshotMetrics metrics, LogContext logContext, Function<TopicIdPartition, Optional<Path>> topicIdPath, String clusterId, int nodeId, Supplier<Boolean> deleteEnable, Supplier<Long> retentionMs, Time time) {
        return new KRaftSnapshotManager(executor, objectStore, metrics, logContext, topicIdPath, clusterId, nodeId, deleteEnable, retentionMs, time);
    }

    static final class Context {
        int remoteListRetries = 0;
        Optional<SortedSet<KRaftSnapshotObject>> remoteObjects = Optional.empty();
        boolean uploadScheduled = false;
        int remotePutRetries = 0;
        final SortedSet<LocalSnapshotObject> localObjects = new TreeSet<LocalSnapshotObject>();
        int remoteDeleteRetries = 0;

        Context() {
        }
    }
}

