/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.image.loader;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.TraceContextRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.metadata.MetadataEncryptorFactory;
import org.apache.kafka.metadata.util.OpenTelemetryUtils;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;

public class MetadataBatchLoader {
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataUpdater callback;
    private final MetadataEncryptorFactory encryptorFactory;
    private final Function<String, String> nameToTenantCallback;
    private MetadataImage image;
    private MetadataDelta delta;
    private long lastOffset;
    private int lastEpoch;
    private long lastContainedLogTimeMs;
    private long numBytes;
    private int numBatches;
    private long totalBatchElapsedNs;
    private TransactionState transactionState;
    private OtelTracingState tracingState;
    private boolean hasSeenRecord;
    private Context traceContext;

    public MetadataBatchLoader(LogContext logContext, Time time, FaultHandler faultHandler, MetadataUpdater callback, MetadataEncryptorFactory encryptorFactory, Function<String, String> nameToTenantCallback) {
        this.log = logContext.logger(MetadataBatchLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.callback = callback;
        this.encryptorFactory = encryptorFactory;
        this.nameToTenantCallback = nameToTenantCallback;
        this.resetToImage(MetadataImage.EMPTY);
        this.hasSeenRecord = false;
        this.traceContext = null;
    }

    public boolean hasSeenRecord() {
        return this.hasSeenRecord;
    }

    public final void resetToImage(MetadataImage image) {
        this.image = image;
        this.hasSeenRecord = !image.isEmpty();
        this.delta = new MetadataDelta.Builder().setImage(image).setMetadataEncryptorFactory(this.encryptorFactory).setNameToTenantCallback(this.nameToTenantCallback).build();
        this.mayResetTraceState();
        this.transactionState = TransactionState.NO_TRANSACTION;
        this.lastOffset = image.provenance().lastContainedOffset();
        this.lastEpoch = image.provenance().lastContainedEpoch();
        this.lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
        this.numBytes = 0L;
        this.numBatches = 0;
        this.totalBatchElapsedNs = 0L;
    }

    private void mayResetTraceState() {
        if (this.transactionState != TransactionState.NO_TRANSACTION || this.tracingState == OtelTracingState.ENDED_TRACE) {
            this.tracingState = OtelTracingState.NO_TRACE;
            this.traceContext = null;
        }
    }

    public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAndEpoch) {
        long startNs = this.time.nanoseconds();
        int indexWithinBatch = 0;
        this.lastContainedLogTimeMs = batch.appendTimestamp();
        this.lastEpoch = batch.epoch();
        for (ApiMessageAndVersion record : batch.records()) {
            try {
                this.replay(record);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Error loading metadata log record from offset " + (batch.baseOffset() + (long)indexWithinBatch), e);
            }
            if (this.transactionState == TransactionState.STARTED_TRANSACTION && (indexWithinBatch > 0 || this.numBatches > 0)) {
                MetadataProvenance provenance = new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs, indexWithinBatch == 0);
                LogDeltaManifest manifest = LogDeltaManifest.newBuilder().provenance(provenance).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us. The delta is {}.", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs()), provenance.isOffsetBatchAligned() ? "batch aligned" : "not batch aligned"});
                }
                this.applyDeltaAndUpdate(this.delta, manifest);
                this.transactionState = TransactionState.STARTED_TRANSACTION;
            } else if (indexWithinBatch > 0 || this.numBatches > 0) {
                if (this.tracingState == OtelTracingState.STARTED_TRACE) {
                    this.maybeFlushBatches(leaderAndEpoch, indexWithinBatch == 0);
                } else if (this.tracingState == OtelTracingState.ENDED_TRACE && this.traceContext != null) {
                    this.lastOffset = batch.baseOffset() + (long)indexWithinBatch;
                    this.maybeFlushBatches(leaderAndEpoch, this.lastOffset == batch.lastOffset());
                }
            }
            this.lastOffset = batch.baseOffset() + (long)indexWithinBatch;
            ++indexWithinBatch;
        }
        long elapsedNs = this.time.nanoseconds() - startNs;
        this.lastOffset = batch.lastOffset();
        this.numBytes += (long)batch.sizeInBytes();
        ++this.numBatches;
        this.totalBatchElapsedNs += elapsedNs;
        return this.totalBatchElapsedNs;
    }

    public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch, boolean isOffsetBatchAligned) {
        MetadataProvenance provenance = new MetadataProvenance(this.lastOffset, this.lastEpoch, this.lastContainedLogTimeMs, isOffsetBatchAligned);
        LogDeltaManifest manifest = LogDeltaManifest.newBuilder().provenance(provenance).leaderAndEpoch(leaderAndEpoch).numBatches(this.numBatches).elapsedNs(this.totalBatchElapsedNs).numBytes(this.numBytes).build();
        switch (this.transactionState.ordinal()) {
            case 1: 
            case 2: {
                this.log.debug("handleCommit: not publishing since a transaction starting at {} is still in progress. {} batch(es) processed so far.", (Object)this.image.offset(), (Object)this.numBatches);
                break;
            }
            case 4: {
                this.log.debug("handleCommit: publishing empty delta between {} and {} from {} batch(es) since a transaction was aborted", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches()});
                this.applyDeltaAndUpdate(new MetadataDelta.Builder().setImage(this.image).setMetadataEncryptorFactory(this.encryptorFactory).setNameToTenantCallback(this.nameToTenantCallback).build(), manifest);
                break;
            }
            case 0: 
            case 3: {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us. The delta is {}.", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs()), provenance.isOffsetBatchAligned() ? "batch aligned" : "not batch aligned"});
                }
                this.applyDeltaAndUpdate(this.delta, manifest);
            }
        }
    }

    private void replay(ApiMessageAndVersion record) {
        MetadataRecordType type = MetadataRecordType.fromId(record.message().apiKey());
        switch (type) {
            case BEGIN_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.STARTED_TRANSACTION || this.transactionState == TransactionState.CONTINUED_TRANSACTION) {
                    throw new RuntimeException("Encountered BeginTransactionRecord while already in a transaction");
                }
                this.transactionState = TransactionState.STARTED_TRANSACTION;
                break;
            }
            case END_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.CONTINUED_TRANSACTION || this.transactionState == TransactionState.STARTED_TRANSACTION) {
                    this.transactionState = TransactionState.ENDED_TRANSACTION;
                    break;
                }
                throw new RuntimeException("Encountered EndTransactionRecord without having seen a BeginTransactionRecord");
            }
            case ABORT_TRANSACTION_RECORD: {
                if (this.transactionState == TransactionState.CONTINUED_TRANSACTION || this.transactionState == TransactionState.STARTED_TRANSACTION) {
                    this.transactionState = TransactionState.ABORTED_TRANSACTION;
                    break;
                }
                throw new RuntimeException("Encountered AbortTransactionRecord without having seen a BeginTransactionRecord");
            }
            case TRACE_CONTEXT_RECORD: {
                if (this.transactionState != TransactionState.NO_TRANSACTION) break;
                this.traceContext = OpenTelemetryUtils.extractContext((TraceContextRecord)record.message());
                this.tracingState = OtelTracingState.STARTED_TRACE;
                break;
            }
            case NO_TRACE_CONTEXT_RECORD: {
                this.tracingState = OtelTracingState.ENDED_TRACE;
                break;
            }
            default: {
                switch (this.transactionState.ordinal()) {
                    case 1: {
                        this.transactionState = TransactionState.CONTINUED_TRANSACTION;
                        break;
                    }
                    case 3: 
                    case 4: {
                        this.transactionState = TransactionState.NO_TRANSACTION;
                        break;
                    }
                }
                switch (this.tracingState.ordinal()) {
                    case 1: {
                        this.tracingState = OtelTracingState.CONTINUED_TRACE;
                        break;
                    }
                    case 3: {
                        this.tracingState = OtelTracingState.NO_TRACE;
                        this.traceContext = null;
                        break;
                    }
                }
                this.hasSeenRecord = true;
                this.delta.replay(record.message(), record.version());
            }
        }
    }

    private void applyDeltaAndUpdate(MetadataDelta delta, LogDeltaManifest manifest) {
        try {
            this.image = delta.apply(manifest.provenance());
        }
        catch (Throwable e) {
            this.faultHandler.handleFault("Error generating new metadata image from metadata delta between offset " + this.image.offset() + " and " + manifest.provenance().lastContainedOffset(), e);
        }
        Context ctx = Context.current();
        if (this.tracingState != OtelTracingState.NO_TRACE && this.tracingState != OtelTracingState.STARTED_TRACE && this.traceContext != null) {
            ctx = this.traceContext;
        }
        try (Scope scp = ctx.makeCurrent();){
            this.callback.update(delta, this.image, manifest);
        }
        this.resetToImage(this.image);
    }

    public Context getTraceContext() {
        return this.traceContext;
    }

    @FunctionalInterface
    public static interface MetadataUpdater {
        public void update(MetadataDelta var1, MetadataImage var2, LogDeltaManifest var3);
    }

    static enum TransactionState {
        NO_TRANSACTION,
        STARTED_TRANSACTION,
        CONTINUED_TRANSACTION,
        ENDED_TRANSACTION,
        ABORTED_TRANSACTION;

    }

    static enum OtelTracingState {
        NO_TRACE,
        STARTED_TRACE,
        CONTINUED_TRACE,
        ENDED_TRACE;

    }
}

