/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.image.loader;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.MetadataBatchLoader;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.MetadataEncryptorFactory;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;

public class MetadataLoader
implements RaftClient.Listener<ApiMessageAndVersion>,
AutoCloseable {
    private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers";
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataLoaderMetrics metrics;
    private final Supplier<OptionalLong> highWaterMarkAccessor;
    private final MetadataEncryptorFactory encryptorFactory;
    private final Function<String, String> nameToTenantCallback;
    private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
    private final LinkedHashMap<String, MetadataPublisher> publishers;
    private boolean catchingUp = true;
    private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
    private MetadataImage image;
    private final MetadataBatchLoader batchLoader;
    private final KafkaEventQueue eventQueue;

    private MetadataLoader(Time time, LogContext logContext, String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, Supplier<OptionalLong> highWaterMarkAccessor, MetadataEncryptorFactory encryptorFactory, Function<String, String> nameToTenantCallback) {
        this.log = logContext.logger(MetadataLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.metrics = metrics;
        this.highWaterMarkAccessor = highWaterMarkAccessor;
        this.encryptorFactory = encryptorFactory;
        this.nameToTenantCallback = nameToTenantCallback;
        this.uninitializedPublishers = new LinkedHashMap();
        this.publishers = new LinkedHashMap();
        this.image = MetadataImage.EMPTY;
        this.batchLoader = new MetadataBatchLoader(logContext, time, faultHandler, this::maybePublishMetadata, encryptorFactory, nameToTenantCallback);
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix + "metadata-loader-", (EventQueue.Event)new ShutdownEvent());
    }

    MetadataLoaderMetrics metrics() {
        return this.metrics;
    }

    private boolean stillNeedToCatchUp(String where, long offset) {
        if (!this.catchingUp) {
            this.log.trace("{}: we are not in the initial catching up state.", (Object)where);
            return false;
        }
        OptionalLong highWaterMark = this.highWaterMarkAccessor.get();
        if (highWaterMark.isEmpty()) {
            this.log.info("{}: the loader is still catching up because we still don't know the high water mark yet.", (Object)where);
            return true;
        }
        if (highWaterMark.getAsLong() - 1L > offset) {
            this.log.info("{}: The loader is still catching up because we have loaded up to offset " + offset + ", but the high water mark is {}", (Object)where, (Object)highWaterMark.getAsLong());
            return true;
        }
        if (!this.batchLoader.hasSeenRecord()) {
            this.log.info("{}: The loader is still catching up because we have not loaded a controller record as of offset " + offset + " and high water mark is {}", (Object)where, (Object)highWaterMark.getAsLong());
            return true;
        }
        this.log.info("{}: The loader finished catching up to the current high water mark of {}", (Object)where, (Object)highWaterMark.getAsLong());
        this.catchingUp = false;
        return false;
    }

    void scheduleInitializeNewPublishers(long delayNs) {
        this.eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS, (UnaryOperator)new EventQueue.EarliestDeadlineFunction(this.eventQueue.time().nanoseconds() + delayNs), () -> {
            try {
                this.initializeNewPublishers();
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled error initializing new publishers", e);
            }
        });
    }

    void initializeNewPublishers() {
        if (this.uninitializedPublishers.isEmpty()) {
            this.log.debug("InitializeNewPublishers: nothing to do.");
            return;
        }
        if (this.stillNeedToCatchUp("initializeNewPublishers", this.image.highestOffsetAndEpoch().offset())) {
            this.log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} because we are still catching up with quorum metadata. Rescheduling.", (Object)this.uninitializedPublisherNames());
            this.scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100L));
            return;
        }
        this.log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}", (Object)this.uninitializedPublisherNames());
        long startNs = this.time.nanoseconds();
        MetadataDelta delta = new MetadataDelta.Builder().setImage(MetadataImage.EMPTY).setNameToTenantCallback(this.nameToTenantCallback).setMetadataEncryptorFactory(this.encryptorFactory).build();
        ImageReWriter writer = new ImageReWriter(delta);
        this.image.write(writer, new ImageWriterOptions.Builder(this.image.features().metadataVersionOrThrow()).setEligibleLeaderReplicasEnabled(this.image.features().isElrEnabled()).build());
        SnapshotManifest manifest = new SnapshotManifest(this.image.provenance(), this.time.nanoseconds() - startNs);
        Iterator<MetadataPublisher> iter = this.uninitializedPublishers.values().iterator();
        while (iter.hasNext()) {
            MetadataPublisher publisher = iter.next();
            iter.remove();
            try {
                this.log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}", (Object)publisher.name(), (Object)this.image.highestOffsetAndEpoch().offset());
                publisher.onMetadataUpdate(delta, this.image, manifest);
                publisher.onControllerChange(this.currentLeaderAndEpoch);
                this.publishers.put(publisher.name(), publisher);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled error initializing " + publisher.name() + " with a snapshot at offset " + this.image.highestOffsetAndEpoch().offset(), e);
            }
        }
    }

    private String uninitializedPublisherNames() {
        return String.join((CharSequence)", ", this.uninitializedPublishers.keySet());
    }

    private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, LoaderManifest manifest) {
        this.image = image;
        if (this.stillNeedToCatchUp("maybePublishMetadata(" + manifest.type().toString() + ")", manifest.provenance().lastContainedOffset())) {
            return;
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("handleCommit: publishing new image with provenance {}.", (Object)image.provenance());
        }
        for (MetadataPublisher publisher : this.publishers.values()) {
            try {
                publisher.onMetadataUpdate(delta, image, manifest);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled error publishing the new metadata image ending at " + manifest.provenance().lastContainedOffset() + " with publisher " + publisher.name(), e);
            }
        }
        this.metrics.updateLastAppliedImageProvenance(image.provenance());
        this.metrics.setCurrentMetadataVersion(image.features().metadataVersionOrThrow());
        if (!this.uninitializedPublishers.isEmpty()) {
            this.scheduleInitializeNewPublishers(0L);
        }
    }

    public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
        this.eventQueue.append(() -> {
            try (BatchReader twrVar0$ = reader;){
                while (reader.hasNext()) {
                    Batch batch = (Batch)reader.next();
                    long elapsedNs = this.batchLoader.loadBatch((Batch<ApiMessageAndVersion>)batch, this.currentLeaderAndEpoch);
                    this.metrics.updateBatchSize(batch.records().size());
                    this.metrics.updateBatchProcessingTimeNs(elapsedNs);
                }
                this.batchLoader.maybeFlushBatches(this.currentLeaderAndEpoch, true);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleCommit. Last image offset was " + this.image.offset(), e);
            }
        });
    }

    public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
        this.eventQueue.append(() -> {
            try {
                long numLoaded = this.metrics.incrementHandleLoadSnapshotCount();
                String snapshotName = Snapshots.filenameFromSnapshotId((OffsetAndEpoch)reader.snapshotId());
                this.log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.", (Object)snapshotName, (Object)numLoaded);
                MetadataDelta delta = new MetadataDelta.Builder().setImage(this.image).setNameToTenantCallback(this.nameToTenantCallback).setMetadataEncryptorFactory(this.encryptorFactory).build();
                SnapshotManifest manifest = this.loadSnapshot(delta, reader);
                this.log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} and this snapshot in {} us.", new Object[]{snapshotName, this.image.provenance().lastContainedOffset(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs())});
                MetadataImage image = delta.apply(manifest.provenance());
                this.batchLoader.resetToImage(image);
                this.maybePublishMetadata(delta, image, manifest);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleLoadSnapshot. Snapshot offset was " + reader.lastContainedLogOffset(), e);
            }
            finally {
                reader.close();
            }
        });
    }

    SnapshotManifest loadSnapshot(MetadataDelta delta, SnapshotReader<ApiMessageAndVersion> reader) {
        long startNs = this.time.nanoseconds();
        int snapshotIndex = 0;
        while (reader.hasNext()) {
            Batch batch = (Batch)reader.next();
            for (ApiMessageAndVersion record : batch.records()) {
                try {
                    delta.replay(record.message(), record.version());
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Error loading metadata log record " + snapshotIndex + " in snapshot at offset " + reader.lastContainedLogOffset(), e);
                }
                ++snapshotIndex;
            }
        }
        delta.finishSnapshot();
        MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(), reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp(), true);
        return new SnapshotManifest(provenance, this.time.nanoseconds() - startNs);
    }

    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
        this.eventQueue.append(() -> {
            this.currentLeaderAndEpoch = leaderAndEpoch;
            for (MetadataPublisher publisher : this.publishers.values()) {
                try {
                    publisher.onControllerChange(this.currentLeaderAndEpoch);
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Unhandled error publishing the new leader change to " + String.valueOf(this.currentLeaderAndEpoch) + " with publisher " + publisher.name(), e);
                }
            }
            this.metrics.setCurrentControllerId(leaderAndEpoch.leaderId().orElse(-1));
        });
    }

    public CompletableFuture<Void> installPublishers(List<? extends MetadataPublisher> newPublishers) {
        if (newPublishers.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.eventQueue.append(() -> {
            try {
                for (MetadataPublisher newPublisher : newPublishers) {
                    MetadataPublisher prev = this.publishers.get(newPublisher.name());
                    if (prev == null) {
                        prev = this.uninitializedPublishers.get(newPublisher.name());
                    }
                    if (prev == null) continue;
                    if (prev == newPublisher) {
                        throw this.faultHandler.handleFault("Attempted to install publisher " + newPublisher.name() + ", which is already installed.");
                    }
                    throw this.faultHandler.handleFault("Attempted to install a new publisher named " + newPublisher.name() + ", but there is already a publisher with that name.");
                }
                newPublishers.forEach(p -> this.uninitializedPublishers.put(p.name(), (MetadataPublisher)p));
                this.scheduleInitializeNewPublishers(0L);
                future.complete(null);
            }
            catch (Throwable e) {
                future.completeExceptionally(this.faultHandler.handleFault("Unhandled fault in MetadataLoader#installPublishers", e));
            }
        });
        return future;
    }

    void waitForAllEventsToBeHandled() throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> future.complete(null));
        future.get();
    }

    public CompletableFuture<Void> removeAndClosePublisher(MetadataPublisher publisher) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.eventQueue.append(() -> {
            try {
                if (!this.publishers.remove(publisher.name(), publisher) && !this.uninitializedPublishers.remove(publisher.name(), publisher)) {
                    throw this.faultHandler.handleFault("Attempted to remove publisher " + publisher.name() + ", which is not installed.");
                }
                this.closePublisher(publisher);
                future.complete(null);
            }
            catch (Throwable e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public long lastAppliedOffset() {
        return this.metrics.lastAppliedOffset();
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown");
    }

    Time time() {
        return this.time;
    }

    private void closePublisher(MetadataPublisher publisher) {
        try {
            publisher.close();
        }
        catch (Throwable e) {
            this.faultHandler.handleFault("Got unexpected exception while closing publisher " + publisher.name(), e);
        }
    }

    @Override
    public void close() throws Exception {
        this.beginShutdown();
        this.eventQueue.close();
    }

    class ShutdownEvent
    implements EventQueue.Event {
        ShutdownEvent() {
        }

        public void run() throws Exception {
            Iterator<MetadataPublisher> iter = MetadataLoader.this.uninitializedPublishers.values().iterator();
            while (iter.hasNext()) {
                MetadataLoader.this.closePublisher(iter.next());
                iter.remove();
            }
            iter = MetadataLoader.this.publishers.values().iterator();
            while (iter.hasNext()) {
                MetadataLoader.this.closePublisher(iter.next());
                iter.remove();
            }
        }
    }

    public static class Builder {
        private int nodeId = -1;
        private String threadNamePrefix = "";
        private Time time = Time.SYSTEM;
        private LogContext logContext = null;
        private FaultHandler faultHandler = FaultHandlerException::new;
        private MetadataLoaderMetrics metrics = null;
        private Supplier<OptionalLong> highWaterMarkAccessor = null;
        private MetadataEncryptorFactory encryptorFactory = null;
        private Function<String, String> nameToTenantCallback = __ -> null;

        public Builder setNodeId(int nodeId) {
            this.nodeId = nodeId;
            return this;
        }

        public Builder setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
            return this;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setFaultHandler(FaultHandler faultHandler) {
            this.faultHandler = faultHandler;
            return this;
        }

        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) {
            this.highWaterMarkAccessor = highWaterMarkAccessor;
            return this;
        }

        public Builder setMetrics(MetadataLoaderMetrics metrics) {
            this.metrics = metrics;
            return this;
        }

        public Builder setMetadataEncryptorFactory(MetadataEncryptorFactory encryptorFactory) {
            this.encryptorFactory = encryptorFactory;
            return this;
        }

        public Builder setNameToTenantCallback(Function<String, String> nameToTenantCallback) {
            this.nameToTenantCallback = nameToTenantCallback;
            return this;
        }

        public MetadataLoader build() {
            if (this.logContext == null) {
                this.logContext = new LogContext("[MetadataLoader id=" + this.nodeId + "] ");
            }
            if (this.highWaterMarkAccessor == null) {
                throw new RuntimeException("You must set the high water mark accessor.");
            }
            if (this.metrics == null) {
                this.metrics = new MetadataLoaderMetrics(Optional.empty(), __ -> {}, __ -> {}, new AtomicReference<MetadataProvenance>(MetadataProvenance.EMPTY));
            }
            if (this.encryptorFactory == null) {
                throw new RuntimeException("You must set the metadata encryptor factory.");
            }
            return new MetadataLoader(this.time, this.logContext, this.threadNamePrefix, this.faultHandler, this.metrics, this.highWaterMarkAccessor, this.encryptorFactory, this.nameToTenantCallback);
        }
    }
}

