/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.replica;

import io.confluent.kafka.replication.push.ReplicationState;
import io.confluent.kafka.replication.push.ReplicationStateMetadata;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedReplicationSessionIdException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.replica.ReplicaState;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Replica {
    private static final Logger LOGGER = LoggerFactory.getLogger(Replica.class);
    private final int brokerId;
    private final TopicPartition topicPartition;
    private final MetadataCache metadataCache;
    private final AtomicReference<ReplicaState> replicaState;

    public Replica(int brokerId, TopicPartition topicPartition, MetadataCache metadataCache) {
        this.brokerId = brokerId;
        this.topicPartition = topicPartition;
        this.metadataCache = metadataCache;
        this.replicaState = new AtomicReference<ReplicaState>(ReplicaState.EMPTY);
    }

    public ReplicaState stateSnapshot() {
        return this.replicaState.get();
    }

    public int brokerId() {
        return this.brokerId;
    }

    public void updateFetchStateOrThrow(LogOffsetMetadata followerFetchOffsetMetadata, long followerStartOffset, long followerFetchTimeMs, long leaderEndOffset, long brokerEpoch, long replicationSessionId) {
        this.replicaState.updateAndGet(currentReplicaState -> {
            ReplicationStateMetadata replicationSessionState;
            Optional cachedBrokerEpoch = this.metadataCache.getAliveBrokerEpoch(this.brokerId);
            if (brokerEpoch != -1L && cachedBrokerEpoch.filter(e -> e > brokerEpoch).isPresent()) {
                throw new NotLeaderOrFollowerException("Received stale fetch state update. broker epoch=" + brokerEpoch + " vs expected=" + String.valueOf(currentReplicaState.brokerEpoch()));
            }
            if (currentReplicaState.brokerEpoch().isPresent() && currentReplicaState.brokerEpoch().get().equals(brokerEpoch) && currentReplicaState.replicationSessionState().replicationSessionId() > replicationSessionId) {
                throw new FencedReplicationSessionIdException(String.format("Received stale fetch state update with replica epoch=%d and replication session ID=%d. Current replica state %s", brokerEpoch, replicationSessionId, currentReplicaState));
            }
            if (!currentReplicaState.brokerEpoch().isPresent() || currentReplicaState.brokerEpoch().get() < brokerEpoch) {
                if (!currentReplicaState.replicationSessionState().mode().equals((Object)ReplicationState.Mode.PULL)) {
                    throw new IllegalStateException(String.format("Unexpected fetch state update attempted with newer replica epoch=%d and replication session ID=%d while current replica state %s is not in mode=%s", brokerEpoch, replicationSessionId, currentReplicaState, ReplicationState.Mode.PULL));
                }
                LOGGER.info("Received fetch state update with newer replica epoch={} and replication session ID={}. Current replica state {} will be updated accordingly.", new Object[]{brokerEpoch, replicationSessionId, currentReplicaState});
                replicationSessionState = new ReplicationStateMetadata(ReplicationState.Mode.PULL, replicationSessionId);
            } else {
                replicationSessionState = currentReplicaState.replicationSessionState();
            }
            long lastCaughtUpTime = followerFetchOffsetMetadata.messageOffset >= leaderEndOffset ? Math.max(currentReplicaState.lastCaughtUpTimeMs(), followerFetchTimeMs) : (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset() ? Math.max(currentReplicaState.lastCaughtUpTimeMs(), currentReplicaState.lastFetchTimeMs()) : currentReplicaState.lastCaughtUpTimeMs());
            return new ReplicaState(followerStartOffset, followerFetchOffsetMetadata, Math.max(leaderEndOffset, currentReplicaState.lastFetchLeaderLogEndOffset()), followerFetchTimeMs, lastCaughtUpTime, Optional.of(brokerEpoch), replicationSessionState);
        });
    }

    public void updateReplicationSessionState(long brokerEpoch, long replicationSessionId, ReplicationState.Mode replicationMode) {
        this.replicaState.updateAndGet(currentReplicaState -> {
            boolean isValidReplicationSessionUpdate = replicationMode == ReplicationState.Mode.PUSH ? currentReplicaState.canTransitionToPush(brokerEpoch, replicationSessionId) : currentReplicaState.canUpdateBrokerEpochOrReplicationSession(brokerEpoch, replicationSessionId);
            if (!isValidReplicationSessionUpdate) {
                throw new IllegalStateException(String.format("Illegal state transition attempted from current replica state %s to new replica state with [brokerEpoch=%d, replicationSessionId=%d, mode=%s]", currentReplicaState, brokerEpoch, replicationSessionId, replicationMode));
            }
            return new ReplicaState(currentReplicaState.logStartOffset(), currentReplicaState.logEndOffsetMetadata(), currentReplicaState.lastFetchLeaderLogEndOffset(), currentReplicaState.lastFetchTimeMs(), currentReplicaState.lastCaughtUpTimeMs(), Optional.of(brokerEpoch), new ReplicationStateMetadata(replicationMode, replicationSessionId));
        });
    }

    public void resetReplicaState(long currentTimeMs, long leaderEndOffset, boolean isNewLeader, boolean isFollowerInSync) {
        this.replicaState.updateAndGet(currentReplicaState -> {
            long lastCaughtUpTimeMs;
            long l = lastCaughtUpTimeMs = isFollowerInSync ? currentTimeMs : 0L;
            if (isNewLeader) {
                return new ReplicaState(-1L, LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, -1L, 0L, lastCaughtUpTimeMs, Optional.empty(), ReplicationStateMetadata.INITIAL);
            }
            return new ReplicaState(currentReplicaState.logStartOffset(), currentReplicaState.logEndOffsetMetadata(), leaderEndOffset, isFollowerInSync ? currentTimeMs : 0L, lastCaughtUpTimeMs, currentReplicaState.brokerEpoch(), ReplicationStateMetadata.INITIAL);
        });
        LOGGER.trace("Reset state of replica to {}", (Object)this);
    }

    public void maybeResetLastCaughtUpTime(long currentTimeMs, long leaderEndOffset) {
        this.replicaState.updateAndGet(currentReplicaState -> {
            long logEndOffset = currentReplicaState.logEndOffset();
            if (logEndOffset == currentReplicaState.lastFetchLeaderLogEndOffset() && logEndOffset < leaderEndOffset && currentReplicaState.lastCaughtUpTimeMs() > 0L) {
                return new ReplicaState(currentReplicaState.logStartOffset(), currentReplicaState.logEndOffsetMetadata(), leaderEndOffset, Math.max(currentTimeMs, currentReplicaState.lastFetchTimeMs()), Math.max(currentTimeMs, currentReplicaState.lastCaughtUpTimeMs()), currentReplicaState.brokerEpoch(), currentReplicaState.replicationSessionState());
            }
            return currentReplicaState;
        });
    }

    public String toString() {
        ReplicaState replicaState = this.replicaState.get();
        return "Replica(replicaId=" + this.brokerId + ", topic=" + this.topicPartition.topic() + ", partition=" + this.topicPartition.partition() + ", lastCaughtUpTimeMs=" + replicaState.lastCaughtUpTimeMs() + ", logStartOffset=" + replicaState.logStartOffset() + ", logEndOffset=" + replicaState.logEndOffsetMetadata().messageOffset + ", logEndOffsetMetadata=" + String.valueOf(replicaState.logEndOffsetMetadata()) + ", lastFetchLeaderLogEndOffset=" + replicaState.lastFetchLeaderLogEndOffset() + ", brokerEpoch=" + String.valueOf(replicaState.brokerEpoch().orElse(-2L)) + ", replicationSessionState=" + String.valueOf(replicaState.replicationSessionState()) + ", lastFetchTimeMs=" + replicaState.lastFetchTimeMs() + ")";
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        Replica other = (Replica)o;
        return this.brokerId == other.brokerId && this.topicPartition.equals((Object)other.topicPartition);
    }

    public int hashCode() {
        return 31 + this.topicPartition.hashCode() + 17 * this.brokerId;
    }
}

