/*
 * Decompiled with CFR 0.152.
 */
package co.elastic.apm.agent.kafka.helper;

import co.elastic.apm.agent.configuration.CoreConfiguration;
import co.elastic.apm.agent.configuration.MessagingConfiguration;
import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.context.Message;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.matcher.WildcardMatcher;
import co.elastic.apm.agent.shaded.slf4j.Logger;
import co.elastic.apm.agent.shaded.slf4j.LoggerFactory;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;

class ConsumerRecordsIteratorWrapper
implements Iterator<ConsumerRecord> {
    public static final Logger logger = LoggerFactory.getLogger(ConsumerRecordsIteratorWrapper.class);
    private final Iterator<ConsumerRecord> delegate;
    private final ElasticApmTracer tracer;
    private final CoreConfiguration coreConfiguration;
    private final MessagingConfiguration messagingConfiguration;

    public ConsumerRecordsIteratorWrapper(Iterator<ConsumerRecord> delegate, ElasticApmTracer tracer) {
        this.delegate = delegate;
        this.tracer = tracer;
        this.coreConfiguration = tracer.getConfig(CoreConfiguration.class);
        this.messagingConfiguration = tracer.getConfig(MessagingConfiguration.class);
    }

    @Override
    public boolean hasNext() {
        this.endCurrentTransaction();
        return this.delegate.hasNext();
    }

    public void endCurrentTransaction() {
        try {
            Transaction transaction = this.tracer.currentTransaction();
            if (transaction != null && "messaging".equals(transaction.getType())) {
                ((Transaction)transaction.deactivate()).end();
            }
        }
        catch (Exception e) {
            logger.error("Error in Kafka iterator wrapper", e);
        }
    }

    @Override
    public ConsumerRecord next() {
        this.endCurrentTransaction();
        ConsumerRecord record = this.delegate.next();
        try {
            String topic = record.topic();
            if (!WildcardMatcher.isAnyMatch(this.messagingConfiguration.getIgnoreMessageQueues(), topic)) {
                Header traceParentHeader = record.headers().lastHeader("elasticapmtraceparent");
                Transaction transaction = traceParentHeader != null ? this.tracer.startTransaction(TraceContext.fromTraceparentBinaryHeader(), traceParentHeader.value(), ConsumerRecordsIteratorWrapper.class.getClassLoader()) : this.tracer.startRootTransaction(ConsumerRecordsIteratorWrapper.class.getClassLoader());
                ((Transaction)transaction.withType("messaging").withName("Kafka record from " + topic)).activate();
                Message message = transaction.getContext().getMessage();
                message.withQueue(topic);
                if (record.timestampType() == TimestampType.CREATE_TIME) {
                    message.withAge(System.currentTimeMillis() - record.timestamp());
                }
                if (transaction.isSampled() && this.coreConfiguration.isCaptureHeaders()) {
                    for (Header header : record.headers()) {
                        String key = header.key();
                        if ("elasticapmtraceparent".equals(key) || WildcardMatcher.anyMatch(this.coreConfiguration.getSanitizeFieldNames(), key) != null) continue;
                        message.addHeader(key, header.value());
                    }
                }
                if (transaction.isSampled() && this.coreConfiguration.getCaptureBody() != CoreConfiguration.EventType.OFF) {
                    message.appendToBody("key=").appendToBody(String.valueOf(record.key())).appendToBody("; ").appendToBody("value=").appendToBody(String.valueOf(record.value()));
                }
            }
        }
        catch (Exception e) {
            logger.error("Error in transaction creation based on Kafka record", e);
        }
        return record;
    }

    @Override
    public void remove() {
        this.delegate.remove();
    }
}

