package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/state/PartitionMonitor.class */
public class PartitionMonitor<K, V> implements ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(PartitionMonitor.class);
    private final Consumer<K, V> consumer;
    private final ShardManager<K, V> sm;
    private double USED_PAYLOAD_THRESHOLD_MULTIPLIER = 0.75d;
    private final Map<TopicPartition, PartitionState<K, V>> partitionStates = new HashMap();
    private final Map<TopicPartition, Integer> partitionsAssignmentEpochs = new HashMap();

    public PartitionState<K, V> getState(TopicPartition topicPartition) {
        PartitionState<K, V> partitionState;
        synchronized (this.partitionStates) {
            partitionState = this.partitionStates.get(topicPartition);
        }
        return partitionState;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        log.debug("Partitions assigned: {}", collection);
        synchronized (this.partitionStates) {
            for (TopicPartition topicPartition : collection) {
                if (this.partitionStates.containsKey(topicPartition)) {
                    log.warn("New assignment of partition {} which already exists in partition state. Could be a state bug.", topicPartition);
                }
            }
            incrementPartitionAssignmentEpoch(collection);
            try {
                this.partitionStates.putAll(new OffsetMapCodecManager(this.consumer).loadOffsetMapForPartition(UniSets.copyOf(collection)));
            } catch (Exception e) {
                log.error("Error in onPartitionsAssigned", e);
                throw e;
            }
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        log.info("Partitions revoked: {}", collection);
        try {
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsRevoked", e);
            throw e;
        }
    }

    void onPartitionsRemoved(Collection<TopicPartition> collection) {
        synchronized (this.partitionStates) {
            incrementPartitionAssignmentEpoch(collection);
            resetOffsetMapAndRemoveWork(collection);
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        try {
            log.info("Lost partitions: {}", collection);
            onPartitionsRemoved(collection);
        } catch (Exception e) {
            log.error("Error in onPartitionsLost", e);
            throw e;
        }
    }

    public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((topicPartition, offsetAndMetadata) -> {
            getState(topicPartition).onOffsetCommitSuccess(offsetAndMetadata);
        });
    }

    private void resetOffsetMapAndRemoveWork(Collection<TopicPartition> collection) {
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            NavigableMap<Long, WorkContainer<K, V>> commitQueues = this.partitionStates.remove(it.next()).getCommitQueues();
            if (commitQueues != null) {
                this.sm.removeShardsFoundIn(commitQueues);
            } else {
                log.trace("Removing empty commit queue");
            }
        }
    }

    public int getEpoch(ConsumerRecord<K, V> consumerRecord, TopicPartition topicPartition) {
        Integer num = this.partitionsAssignmentEpochs.get(topicPartition);
        consumerRecord.topic();
        if (num == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", consumerRecord));
        }
        return num.intValue();
    }

    public int getEpoch(ConsumerRecord<K, V> consumerRecord) {
        Integer num = this.partitionsAssignmentEpochs.get(KafkaUtils.toTP(consumerRecord));
        consumerRecord.topic();
        if (num == null) {
            throw new InternalRuntimeError(StringUtils.msg("Received message for a partition which is not assigned: {}", consumerRecord));
        }
        return num.intValue();
    }

    private void incrementPartitionAssignmentEpoch(Collection<TopicPartition> collection) {
        for (TopicPartition topicPartition : collection) {
            this.partitionsAssignmentEpochs.put(topicPartition, Integer.valueOf(this.partitionsAssignmentEpochs.getOrDefault(topicPartition, -1).intValue() + 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkEpochIsStale(WorkContainer<K, V> workContainer) {
        Integer num = this.partitionsAssignmentEpochs.get(workContainer.getTopicPartition());
        int epoch = workContainer.getEpoch();
        if (num.intValue() == epoch) {
            return false;
        }
        log.debug("Epoch mismatch {} vs {} for record {} - were partitions lost? Skipping message - it's already assigned to a different consumer (possibly me).", new Object[]{Integer.valueOf(epoch), num, workContainer});
        return true;
    }

    private void maybeRaiseHighestSeenOffset(WorkContainer<K, V> workContainer) {
        maybeRaiseHighestSeenOffset(workContainer.getTopicPartition(), workContainer.offset());
    }

    public void maybeRaiseHighestSeenOffset(TopicPartition topicPartition, long j) {
        getState(topicPartition).maybeRaiseHighestSeenOffset(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRecordPreviouslyProcessed(ConsumerRecord<K, V> consumerRecord) {
        TopicPartition tp = KafkaUtils.toTP(consumerRecord);
        PartitionState<K, V> state = getState(tp);
        if (state == null) {
            log.error("No state found for partition {}, presuming message never before been processed. Partition epoch: {}", tp, Integer.valueOf(getEpoch(consumerRecord, tp)));
            return false;
        }
        boolean isRecordPreviouslyProcessed = state.isRecordPreviouslyProcessed(consumerRecord);
        log.trace("Record {} previously seen? {}", Long.valueOf(consumerRecord.offset()), Boolean.valueOf(isRecordPreviouslyProcessed));
        return isRecordPreviouslyProcessed;
    }

    public boolean isAllowedMoreRecords(TopicPartition topicPartition) {
        return getState(topicPartition).isAllowedMoreRecords();
    }

    public boolean hasWorkInCommitQueues() {
        Iterator<PartitionState<K, V>> it = this.partitionStates.values().iterator();
        while (it.hasNext()) {
            if (it.next().hasWorkInCommitQueue()) {
                return true;
            }
        }
        return false;
    }

    public long getNumberOfEntriesInPartitionQueues() {
        return this.partitionStates.values().stream().mapToLong((v0) -> {
            return v0.getCommitQueueSize();
        }).reduce(Long::sum).orElse(0L);
    }

    private void setPartitionMoreRecordsAllowedToProcess(TopicPartition topicPartition, boolean z) {
        getState(topicPartition).setAllowedMoreRecords(z);
    }

    public Long getHighestSeenOffset(TopicPartition topicPartition) {
        return getState(topicPartition).getOffsetHighestSeen();
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        maybeRaiseHighestSeenOffset(workContainer);
        getState(workContainer.getTopicPartition()).getCommitQueues().put(Long.valueOf(workContainer.offset()), workContainer);
    }

    public boolean isBlocked(TopicPartition topicPartition) {
        return !isAllowedMoreRecords(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addEncodedOffsets(Map<TopicPartition, OffsetAndMetadata> map, TopicPartition topicPartition, LinkedHashSet<Long> linkedHashSet) {
        boolean z;
        OffsetAndMetadata offsetAndMetadata;
        if (!(!linkedHashSet.isEmpty())) {
            setPartitionMoreRecordsAllowedToProcess(topicPartition, true);
            return;
        }
        OffsetAndMetadata offsetAndMetadata2 = map.get(topicPartition);
        long longValue = offsetAndMetadata2 == null ? linkedHashSet.iterator().next().longValue() : offsetAndMetadata2.offset();
        OffsetMapCodecManager offsetMapCodecManager = new OffsetMapCodecManager(this.consumer);
        try {
            PartitionState<K, V> state = getState(topicPartition);
            state.setIncompleteOffsets(linkedHashSet);
            String makeOffsetMetadataPayload = offsetMapCodecManager.makeOffsetMetadataPayload(longValue, state);
            int length = makeOffsetMetadataPayload.length();
            double d = OffsetMapCodecManager.DefaultMaxMetadataSize * this.USED_PAYLOAD_THRESHOLD_MULTIPLIER;
            if (length > OffsetMapCodecManager.DefaultMaxMetadataSize) {
                z = false;
                offsetAndMetadata = new OffsetAndMetadata(longValue);
                log.warn("Offset map data too large (size: {}) to fit in metadata payload hard limit of {} - cannot include in commit. Warning: messages might be replayed on rebalance. See kafka.coordinator.group.OffsetConfig#DefaultMaxMetadataSize = {} and issue #47.", new Object[]{Integer.valueOf(length), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
            } else if (length > d) {
                z = false;
                offsetAndMetadata = new OffsetAndMetadata(longValue, makeOffsetMetadataPayload);
                log.warn("Payload size {} higher than threshold {}, but still lower than max {}. Will write payload, but will not allow further messages, in order to allow the offset data to shrink (via succeeding messages).", new Object[]{Integer.valueOf(length), Double.valueOf(d), Integer.valueOf(OffsetMapCodecManager.DefaultMaxMetadataSize)});
            } else {
                z = true;
                offsetAndMetadata = new OffsetAndMetadata(longValue, makeOffsetMetadataPayload);
                log.debug("Payload size {} within threshold {}", Integer.valueOf(length), Double.valueOf(d));
            }
            setPartitionMoreRecordsAllowedToProcess(topicPartition, z);
            map.put(topicPartition, offsetAndMetadata);
        } catch (EncodingNotSupportedException e) {
            setPartitionMoreRecordsAllowedToProcess(topicPartition, false);
            log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance.", e);
        }
    }

    public boolean isPartitionAssigned(ConsumerRecord<K, V> consumerRecord) {
        return getState(KafkaUtils.toTP(consumerRecord)) != null;
    }

    public PartitionMonitor(Consumer<K, V> consumer, ShardManager<K, V> shardManager) {
        this.consumer = consumer;
        this.sm = shardManager;
    }

    public double getUSED_PAYLOAD_THRESHOLD_MULTIPLIER() {
        return this.USED_PAYLOAD_THRESHOLD_MULTIPLIER;
    }

    public void setUSED_PAYLOAD_THRESHOLD_MULTIPLIER(double d) {
        this.USED_PAYLOAD_THRESHOLD_MULTIPLIER = d;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TopicPartition, PartitionState<K, V>> getPartitionStates() {
        return this.partitionStates;
    }
}
