package org.apache.camel.component.kafka.consumer.support.streaming;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessor;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.class */
public final class KafkaRecordStreamingProcessor extends KafkaRecordProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordStreamingProcessor.class);
    private final boolean autoCommitEnabled;
    private final KafkaConfiguration configuration;
    private final Processor processor;
    private final CommitManager commitManager;

    public KafkaRecordStreamingProcessor(KafkaConfiguration kafkaConfiguration, Processor processor, CommitManager commitManager) {
        this.autoCommitEnabled = kafkaConfiguration.isAutoCommitEnable();
        this.configuration = kafkaConfiguration;
        this.processor = processor;
        this.commitManager = commitManager;
    }

    public ProcessingResult processExchange(KafkaConsumer kafkaConsumer, TopicPartition topicPartition, boolean z, boolean z2, ConsumerRecord<Object, Object> consumerRecord) {
        ProcessingResult processingResult;
        Exchange createExchange = kafkaConsumer.createExchange(false);
        Message message = createExchange.getMessage();
        setupExchangeMessage(message, consumerRecord);
        propagateHeaders(this.configuration, consumerRecord, createExchange);
        if (!this.autoCommitEnabled) {
            message.setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.valueOf(!z2));
            message.setHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.valueOf((z2 || z) ? false : true));
        }
        if (this.configuration.isAllowManualCommit()) {
            message.setHeader(KafkaConstants.MANUAL_COMMIT, this.commitManager.getManualCommit(createExchange, topicPartition, consumerRecord));
            message.setHeader(KafkaConstants.LAST_POLL_RECORD, Boolean.valueOf((z2 || z) ? false : true));
        }
        try {
            this.processor.process(createExchange);
        } catch (Exception e) {
            createExchange.setException(e);
        }
        if (createExchange.getException() != null) {
            LOG.debug("An exception was thrown for consumerRecord at partition {} and offset {}", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
            processingResult = new ProcessingResult(processException(createExchange, topicPartition, consumerRecord, kafkaConsumer.getExceptionHandler()), true);
        } else {
            processingResult = new ProcessingResult(false, createExchange.getException() != null);
        }
        if (!processingResult.isBreakOnErrorHit()) {
            this.commitManager.recordOffset(topicPartition, consumerRecord.offset());
        }
        kafkaConsumer.releaseExchange(createExchange, false);
        return processingResult;
    }

    private boolean processException(Exchange exchange, TopicPartition topicPartition, ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
        if (!this.configuration.isBreakOnFirstError()) {
            exceptionHandler.handleException("Error during processing", exchange, exchange.getException());
            return false;
        }
        if (LOG.isWarnEnabled()) {
            LOG.warn("Error during processing {} from topic: {} due to {}", new Object[]{exchange, topicPartition.topic(), exchange.getException().getMessage()});
            LOG.warn("Will seek consumer to offset {} on partition {} and start polling again.", Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
        }
        this.commitManager.commit(topicPartition);
        return true;
    }
}
