/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.Assert;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.RecordConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TrackingRecordConverter<K, V>
implements RecordConverter<K, V, KafkaEventMessage> {
    private static final Logger logger = LoggerFactory.getLogger(TrackingRecordConverter.class);
    private final KafkaMessageConverter<K, V> messageConverter;
    private KafkaTrackingToken currentToken;

    public TrackingRecordConverter(KafkaMessageConverter<K, V> messageConverter, KafkaTrackingToken token) {
        this.messageConverter = messageConverter;
        this.currentToken = (KafkaTrackingToken)Assert.nonNull((Object)token, () -> "Token may not be null");
    }

    @Override
    public List<KafkaEventMessage> convert(ConsumerRecords<K, V> records) {
        ArrayList<KafkaEventMessage> eventMessages = new ArrayList<KafkaEventMessage>(records.count());
        for (ConsumerRecord consumerRecord : records) {
            this.messageConverter.readKafkaMessage(consumerRecord).ifPresent(eventMessage -> {
                KafkaTrackingToken nextToken = this.currentToken.advancedTo(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                logger.debug("Advancing token from [{}] to [{}]", (Object)this.currentToken, (Object)nextToken);
                this.currentToken = nextToken;
                eventMessages.add(KafkaEventMessage.from(eventMessage, consumerRecord, this.currentToken));
            });
        }
        return eventMessages;
    }

    public KafkaTrackingToken currentToken() {
        return this.currentToken;
    }
}

