package kafka.tier.backupobjectlifecycle;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import kafka.tier.backupobjectlifecycle.ObjectStoreUtils;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.AbstractTierSegmentMetadata;
import kafka.tier.domain.TierRecordType;
import kafka.tier.topic.TierTopicManager;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/tier/backupobjectlifecycle/TierTopicReader.class */
public class TierTopicReader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TierTopicReader.class);
    private final TierTopicReaderConfig config;
    private Consumer<byte[], byte[]> consumer;
    private Map<TopicPartition, Long> endOffsets;
    private long[] currentPositions;

    public TierTopicReader(TierTopicReaderConfig tierTopicReaderConfig) throws InterruptedException {
        this.consumer = null;
        this.config = tierTopicReaderConfig;
        this.consumer = new KafkaConsumer(getProperties(tierTopicReaderConfig));
        Set<TopicPartition> partitions = TierTopicManager.partitions(Topic.TIER_TOPIC_NAME, tierTopicReaderConfig.tierMetadataNumPartitions);
        this.consumer.assign(partitions);
        seekConsumer();
        this.endOffsets = (Map) new RetryFramework(tierTopicReaderConfig.maxRetries).call(set -> {
            return this.consumer.endOffsets(set);
        }, partitions);
        this.currentPositions = new long[tierTopicReaderConfig.tierMetadataNumPartitions];
        recordConsumerPositionsAndLag();
    }

    private void canTierTopicReaderRunElseThrow() throws InterruptedException {
        if (!this.config.canCLMRun.get().booleanValue()) {
            throw new RuntimeException("TierTopicReader will stop because CLM needs to stop");
        }
        if (Thread.interrupted() || this.config.isShutdownInitiated.get().booleanValue()) {
            throw new InterruptedException("TierTopicReader will stop because current thread is interrupted");
        }
    }

    public List<ObjectStoreUtils.DeletionRecord> deletedSegments() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        do {
            try {
                if (!hasMoreRecordsToConsume()) {
                    break;
                }
                canTierTopicReaderRunElseThrow();
                arrayList.addAll(filterDeleteRecords(this.consumer.poll(Duration.ofMillis(this.config.tierMetadataMaxPollMs))));
            } finally {
                if (this.consumer != null) {
                    recordConsumerPositionsAndLag();
                }
            }
        } while (arrayList.size() < this.config.maxDeleteCompleteRecordsToConsumeInOneIteration.longValue());
        log.debug("TierTopicReader has consumed " + arrayList.size() + " SEGMENT_DELETE_COMPLETE records");
        return arrayList;
    }

    public long[] currentPositions() {
        return this.currentPositions;
    }

    private void seekConsumer() throws InterruptedException {
        if (this.config.tierOffsets.size() == this.config.tierMetadataNumPartitions) {
            log.debug("Consumer to seek to the offsets supplied by LifecycleManagerState");
            for (int i = 0; i < this.config.tierOffsets.size(); i++) {
                this.consumer.seek(new TopicPartition(Topic.TIER_TOPIC_NAME, i), this.config.tierOffsets.get(i).longValue());
            }
            return;
        }
        HashMap hashMap = new HashMap();
        Long valueOf = Long.valueOf(this.config.time.milliseconds() - (this.config.maxLookBackInDays.get().intValue() * DateUtils.MILLIS_PER_DAY));
        log.warn("LifecycleManagerState not available. Try to seek consumer to " + new Date(valueOf.longValue()));
        for (int i2 = 0; i2 < this.config.tierMetadataNumPartitions; i2++) {
            hashMap.put(new TopicPartition(Topic.TIER_TOPIC_NAME, i2), valueOf);
        }
        Map map = (Map) new RetryFramework(this.config.maxRetries).call(map2 -> {
            return this.consumer.offsetsForTimes(map2);
        }, hashMap);
        for (int i3 = 0; i3 < this.config.tierMetadataNumPartitions; i3++) {
            TopicPartition topicPartition = new TopicPartition(Topic.TIER_TOPIC_NAME, i3);
            if (map.get(topicPartition) != null) {
                log.warn("Consumer to seek to " + map.get(topicPartition) + " for " + topicPartition);
                this.consumer.seek(topicPartition, ((OffsetAndTimestamp) map.get(topicPartition)).offset());
            } else {
                log.warn("Consumer to seek to the beginning for " + topicPartition);
                this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
        }
    }

    private static List<ObjectStoreUtils.DeletionRecord> filterDeleteRecords(ConsumerRecords<byte[], byte[]> consumerRecords) {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        Iterator<ConsumerRecord<byte[], byte[]>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<byte[], byte[]> next = it.next();
            i++;
            Optional<AbstractTierMetadata> deserialize = AbstractTierMetadata.deserialize(next.key(), next.value(), next.timestamp());
            if (!deserialize.isPresent()) {
                throw new RuntimeException("Could not deserialize tier metadata record from partition: " + next.partition() + " at offset: " + next.offset());
            }
            AbstractTierMetadata abstractTierMetadata = deserialize.get();
            if (abstractTierMetadata.type() == TierRecordType.SegmentDeleteComplete) {
                AbstractTierSegmentMetadata abstractTierSegmentMetadata = (AbstractTierSegmentMetadata) abstractTierMetadata;
                arrayList.add(new ObjectStoreUtils.DeletionRecord(abstractTierSegmentMetadata.objectId(), abstractTierSegmentMetadata.topicIdPartition(), Long.valueOf(abstractTierSegmentMetadata.timestamp())));
            }
        }
        log.debug("Processed " + i + " total records and filtered " + arrayList.size() + " records of type SegmentDeleteComplete");
        return arrayList;
    }

    public boolean hasMoreRecordsToConsume() throws InterruptedException {
        for (Map.Entry<TopicPartition, Long> entry : this.endOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            long longValue = ((Long) new RetryFramework(this.config.maxRetries).call(topicPartition -> {
                return Long.valueOf(this.consumer.position(topicPartition));
            }, key)).longValue();
            if (longValue < value.longValue()) {
                log.debug("Consumer position for " + key + " is at " + longValue + " and end offset is " + value);
                return true;
            }
        }
        log.debug("Consumer has consumed till the end of all the tier state partitions");
        return false;
    }

    public void maybeCloseConsumer() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    private void recordConsumerPositionsAndLag() throws InterruptedException {
        long j = 0;
        for (int i = 0; i < this.config.tierMetadataNumPartitions; i++) {
            TopicPartition topicPartition = new TopicPartition(Topic.TIER_TOPIC_NAME, i);
            this.currentPositions[i] = ((Long) new RetryFramework(this.config.maxRetries).call(topicPartition2 -> {
                return Long.valueOf(this.consumer.position(topicPartition2));
            }, topicPartition)).longValue();
            if (this.endOffsets.containsKey(topicPartition)) {
                j += Math.max(0L, this.endOffsets.get(topicPartition).longValue() - this.currentPositions[i]);
            }
        }
        log.info("Total lag (in offsets) for LifecycleManager's consumer " + j);
        this.config.recordLag.accept(Long.valueOf(j));
    }

    private static String clientId(String str, int i) {
        return ConsumerProtocol.PROTOCOL_TYPE + "-" + str + "-" + i + "-CustomLifecycleManager";
    }

    private static String groupId(String str) {
        return str + "-CustomLifecycleManager";
    }

    public static Properties getProperties(TierTopicReaderConfig tierTopicReaderConfig) {
        Properties properties = new Properties();
        properties.putAll(tierTopicReaderConfig.interBrokerClientConfigs.get());
        properties.put("client.id", clientId(tierTopicReaderConfig.clusterId, tierTopicReaderConfig.brokerId));
        properties.put("group.id", groupId(tierTopicReaderConfig.clusterId));
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        properties.remove("metric.reporters");
        return properties;
    }
}
