package org.apache.kafka.shell;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/shell/TrackingListener.class */
public class TrackingListener<T> implements RaftClient.Listener<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TrackingListener.class);
    private final CompletableFuture<Void> caughtUpFuture;
    private final Supplier<OptionalLong> highWaterMarkSupplier;
    private final RaftClient.Listener<T> underlying;

    public TrackingListener(CompletableFuture<Void> completableFuture, Supplier<OptionalLong> supplier, RaftClient.Listener<T> listener) {
        this.caughtUpFuture = completableFuture;
        this.highWaterMarkSupplier = supplier;
        this.underlying = listener;
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public void handleCommit(BatchReader<T> batchReader) {
        TrackingBatchReader trackingBatchReader = new TrackingBatchReader(batchReader);
        if (log.isTraceEnabled()) {
            log.trace("handleCommit(trackingReader.lastOffset={}, trackingReader.baseOffset={})", trackingBatchReader.lastOffset(), Long.valueOf(trackingBatchReader.baseOffset()));
        }
        this.underlying.handleCommit(trackingBatchReader);
        checkIfCaughtUp(trackingBatchReader.lastOffset().getAsLong() + 1);
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public void handleSnapshot(SnapshotReader<T> snapshotReader) {
        if (log.isTraceEnabled()) {
            log.trace("handleSnapshot(reader.lastContainedLogOffset={})", Long.valueOf(snapshotReader.lastContainedLogOffset()));
        }
        this.underlying.handleSnapshot(snapshotReader);
        checkIfCaughtUp(snapshotReader.lastContainedLogOffset() + 1);
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
        if (log.isTraceEnabled()) {
            log.trace("handleLeaderChange(leader={})", leaderAndEpoch);
        }
        this.underlying.handleLeaderChange(leaderAndEpoch);
    }

    @Override // org.apache.kafka.raft.RaftClient.Listener
    public void beginShutdown() {
        if (log.isTraceEnabled()) {
            log.trace("beginShutdown");
        }
        this.underlying.beginShutdown();
    }

    private void checkIfCaughtUp(long j) {
        if (this.caughtUpFuture.isDone()) {
            return;
        }
        OptionalLong optionalLong = this.highWaterMarkSupplier.get();
        if (!optionalLong.isPresent()) {
            log.debug("High water mark is not known for RaftClient.");
            return;
        }
        long asLong = optionalLong.getAsLong();
        if (j < asLong) {
            log.debug("Offset {} has not yet caught up with high watermark {}.", Long.valueOf(j), Long.valueOf(asLong));
        } else {
            log.debug("Offset {} has caught up with high watermark {}.", Long.valueOf(j), Long.valueOf(asLong));
            this.caughtUpFuture.complete(null);
        }
    }
}
