/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.batch.item.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

public class KafkaItemReader<K, V>
extends AbstractItemStreamItemReader<V> {
    private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets";
    private static final long DEFAULT_POLL_TIMEOUT = 30L;
    private List<TopicPartition> topicPartitions;
    private Map<TopicPartition, Long> partitionOffsets;
    private KafkaConsumer<K, V> kafkaConsumer;
    private Properties consumerProperties;
    private Iterator<ConsumerRecord<K, V>> consumerRecords;
    private Duration pollTimeout = Duration.ofSeconds(30L);
    private boolean saveState = true;

    public KafkaItemReader(Properties consumerProperties, String topicName, Integer ... partitions) {
        this(consumerProperties, topicName, Arrays.asList(partitions));
    }

    public KafkaItemReader(Properties consumerProperties, String topicName, List<Integer> partitions) {
        Assert.notNull((Object)consumerProperties, (String)"Consumer properties must not be null");
        Assert.isTrue((boolean)consumerProperties.containsKey("bootstrap.servers"), (String)"bootstrap.servers property must be provided");
        Assert.isTrue((boolean)consumerProperties.containsKey("group.id"), (String)"group.id property must be provided");
        Assert.isTrue((boolean)consumerProperties.containsKey("key.deserializer"), (String)"key.deserializer property must be provided");
        Assert.isTrue((boolean)consumerProperties.containsKey("value.deserializer"), (String)"value.deserializer property must be provided");
        this.consumerProperties = consumerProperties;
        Assert.hasLength((String)topicName, (String)"Topic name must not be null or empty");
        Assert.isTrue((!partitions.isEmpty() ? 1 : 0) != 0, (String)"At least one partition must be provided");
        this.topicPartitions = new ArrayList<TopicPartition>();
        for (Integer partition : partitions) {
            this.topicPartitions.add(new TopicPartition(topicName, partition.intValue()));
        }
    }

    public void setPollTimeout(Duration pollTimeout) {
        Assert.notNull((Object)pollTimeout, (String)"pollTimeout must not be null");
        Assert.isTrue((!pollTimeout.isZero() ? 1 : 0) != 0, (String)"pollTimeout must not be zero");
        Assert.isTrue((!pollTimeout.isNegative() ? 1 : 0) != 0, (String)"pollTimeout must not be negative");
        this.pollTimeout = pollTimeout;
    }

    public void setSaveState(boolean saveState) {
        this.saveState = saveState;
    }

    public boolean isSaveState() {
        return this.saveState;
    }

    public void setPartitionOffsets(Map<TopicPartition, Long> partitionOffsets) {
        this.partitionOffsets = partitionOffsets;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        this.kafkaConsumer = new KafkaConsumer(this.consumerProperties);
        if (this.partitionOffsets == null) {
            this.partitionOffsets = new HashMap<TopicPartition, Long>();
            for (TopicPartition topicPartition : this.topicPartitions) {
                this.partitionOffsets.put(topicPartition, 0L);
            }
        }
        if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
            Map offsets = (Map)executionContext.get(TOPIC_PARTITION_OFFSETS);
            for (Map.Entry entry : offsets.entrySet()) {
                this.partitionOffsets.put((TopicPartition)entry.getKey(), (Long)entry.getValue() == 0L ? 0L : (Long)entry.getValue() + 1L);
            }
        }
        this.kafkaConsumer.assign(this.topicPartitions);
        this.partitionOffsets.forEach((arg_0, arg_1) -> this.kafkaConsumer.seek(arg_0, arg_1));
    }

    @Override
    @Nullable
    public V read() {
        if (this.consumerRecords == null || !this.consumerRecords.hasNext()) {
            this.consumerRecords = this.kafkaConsumer.poll(this.pollTimeout).iterator();
        }
        if (this.consumerRecords.hasNext()) {
            ConsumerRecord<K, V> record = this.consumerRecords.next();
            this.partitionOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            return (V)record.value();
        }
        return null;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        if (this.saveState) {
            executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<TopicPartition, Long>(this.partitionOffsets));
        }
        this.kafkaConsumer.commitSync();
    }

    @Override
    public void close() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
    }
}

