package org.apache.camel.component.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private final Long pollTimeoutMs;
    private final List<KafkaFetchRecords> tasks;
    private volatile boolean stopOffsetRepo;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaConsumer$KafkaFetchRecords.class */
    public class KafkaFetchRecords implements Runnable, ConsumerRebalanceListener {
        private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
        private final String topicName;
        private final Pattern topicPattern;
        private final String threadId;
        private final Properties kafkaProps;
        private final Map<String, Long> lastProcessedOffset = new ConcurrentHashMap();

        KafkaFetchRecords(String str, Pattern pattern, String str2, Properties properties) {
            this.topicName = str;
            this.topicPattern = pattern;
            this.threadId = str + "-Thread " + str2;
            this.kafkaProps = properties;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            boolean z2 = true;
            while (z2) {
                if (!z) {
                    try {
                        doInit();
                    } catch (Throwable th) {
                        KafkaConsumer.LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due {}", th.getMessage(), th);
                    }
                }
                if (!z) {
                    long longValue = KafkaConsumer.this.endpoint.getConfiguration().getPollTimeoutMs().longValue();
                    KafkaConsumer.LOG.info("Reconnecting {} to topic {} after {} ms", new Object[]{this.threadId, this.topicName, Long.valueOf(longValue)});
                    try {
                        Thread.sleep(longValue);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                z = false;
                z2 = doRun();
            }
            KafkaConsumer.LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", this.threadId, this.topicName);
        }

        void preInit() {
            doInit();
        }

        protected void doInit() {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(this.kafkaProps);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        protected boolean doRun() {
            boolean z = false;
            boolean z2 = false;
            try {
                try {
                    if (this.topicPattern != null) {
                        KafkaConsumer.LOG.info("Subscribing {} to topic pattern {}", this.threadId, this.topicName);
                        this.consumer.subscribe(this.topicPattern, this);
                    } else {
                        KafkaConsumer.LOG.info("Subscribing {} to topic {}", this.threadId, this.topicName);
                        this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), this);
                    }
                    StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
                    if (offsetRepository != null) {
                        ConsumerRecords poll = this.consumer.poll(100L);
                        for (TopicPartition topicPartition : this.consumer.assignment()) {
                            String str = (String) offsetRepository.getState(KafkaConsumer.this.serializeOffsetKey(topicPartition));
                            if (str == null || str.isEmpty()) {
                                List records = poll.records(topicPartition);
                                if (!records.isEmpty()) {
                                    long offset = ((ConsumerRecord) records.get(0)).offset();
                                    KafkaConsumer.LOG.debug("Resuming partition {} from offset {}", Integer.valueOf(topicPartition.partition()), Long.valueOf(offset));
                                    this.consumer.seek(topicPartition, offset);
                                }
                            } else {
                                long deserializeOffsetValue = KafkaConsumer.this.deserializeOffsetValue(str) + 1;
                                KafkaConsumer.LOG.debug("Resuming partition {} from offset {} from state", Integer.valueOf(topicPartition.partition()), Long.valueOf(deserializeOffsetValue));
                                this.consumer.seek(topicPartition, deserializeOffsetValue);
                            }
                        }
                    } else if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo() != null) {
                        if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo().equals("beginning")) {
                            KafkaConsumer.LOG.debug("{} is seeking to the beginning on topic {}", this.threadId, this.topicName);
                            this.consumer.poll(Duration.ofMillis(100L));
                            this.consumer.seekToBeginning(this.consumer.assignment());
                        } else if (KafkaConsumer.this.endpoint.getConfiguration().getSeekTo().equals("end")) {
                            KafkaConsumer.LOG.debug("{} is seeking to the end on topic {}", this.threadId, this.topicName);
                            this.consumer.poll(Duration.ofMillis(100L));
                            this.consumer.seekToEnd(this.consumer.assignment());
                        }
                    }
                    while (KafkaConsumer.this.isRunAllowed() && !z && !KafkaConsumer.this.isStoppingOrStopped() && !KafkaConsumer.this.isSuspendingOrSuspended()) {
                        boolean z3 = false;
                        KafkaConsumer.LOG.trace("Polling {} from topic: {} with timeout: {}", new Object[]{this.threadId, this.topicName, KafkaConsumer.this.pollTimeoutMs});
                        ConsumerRecords poll2 = this.consumer.poll(KafkaConsumer.this.pollTimeoutMs.longValue());
                        for (TopicPartition topicPartition2 : poll2.partitions()) {
                            long j = -1;
                            Iterator it = poll2.records(topicPartition2).iterator();
                            KafkaConsumer.LOG.debug("Records count {} received for partition {}", Integer.valueOf(poll2.records(topicPartition2).size()), topicPartition2);
                            if (!z3 && it.hasNext()) {
                                while (!z3 && it.hasNext()) {
                                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                    if (KafkaConsumer.LOG.isTraceEnabled()) {
                                        KafkaConsumer.LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", new Object[]{Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                                    }
                                    Exchange createKafkaExchange = KafkaConsumer.this.endpoint.createKafkaExchange(consumerRecord);
                                    KafkaConsumer.this.propagateHeaders(consumerRecord, createKafkaExchange, KafkaConsumer.this.endpoint.getConfiguration());
                                    if (!KafkaConsumer.this.isAutoCommitEnabled()) {
                                        createKafkaExchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.valueOf(!it.hasNext()));
                                    }
                                    if (KafkaConsumer.this.endpoint.getConfiguration().isAllowManualCommit()) {
                                        createKafkaExchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, KafkaConsumer.this.endpoint.m5getComponent().getKafkaManualCommitFactory().newInstance(createKafkaExchange, this.consumer, this.topicName, this.threadId, offsetRepository, topicPartition2, consumerRecord.offset()));
                                    }
                                    try {
                                        KafkaConsumer.this.processor.process(createKafkaExchange);
                                    } catch (Exception e) {
                                        createKafkaExchange.setException(e);
                                    }
                                    if (createKafkaExchange.getException() == null) {
                                        j = consumerRecord.offset();
                                        this.lastProcessedOffset.put(KafkaConsumer.this.serializeOffsetKey(topicPartition2), Long.valueOf(j));
                                    } else if (KafkaConsumer.this.endpoint.getConfiguration().isBreakOnFirstError()) {
                                        KafkaConsumer.LOG.warn("Error during processing {} from topic: {}. Will seek consumer to offset: {} and re-connect and start polling again.", new Object[]{createKafkaExchange, this.topicName, Long.valueOf(j)});
                                        commitOffset(offsetRepository, topicPartition2, j, true);
                                        z3 = true;
                                    } else {
                                        KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", createKafkaExchange, createKafkaExchange.getException());
                                    }
                                }
                                if (!z3) {
                                    commitOffset(offsetRepository, topicPartition2, j, false);
                                }
                            }
                        }
                        if (z3) {
                            z = true;
                        }
                    }
                    if (!z && KafkaConsumer.this.isAutoCommitEnabled()) {
                        if ("async".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                            KafkaConsumer.LOG.info("Auto commitAsync on stop {} from topic {}", this.threadId, this.topicName);
                            this.consumer.commitAsync();
                        } else if ("sync".equals(KafkaConsumer.this.endpoint.getConfiguration().getAutoCommitOnStop())) {
                            KafkaConsumer.LOG.info("Auto commitSync on stop {} from topic {}", this.threadId, this.topicName);
                            this.consumer.commitSync();
                        }
                    }
                    KafkaConsumer.LOG.info("Unsubscribing {} from topic {}", this.threadId, this.topicName);
                    z2 = true;
                    this.consumer.unsubscribe();
                    KafkaConsumer.LOG.debug("Closing {}", this.threadId);
                    IOHelper.close(this.consumer);
                } catch (KafkaException e2) {
                    if (z2) {
                        KafkaConsumer.this.getExceptionHandler().handleException("Error unsubscribing " + this.threadId + " from kafka topic " + this.topicName, e2);
                    } else {
                        KafkaConsumer.LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will attempt to re-connect on next run", new Object[]{this.threadId, this.topicName, e2.getMessage()});
                        z = true;
                    }
                    KafkaConsumer.LOG.debug("Closing {}", this.threadId);
                    IOHelper.close(this.consumer);
                } catch (InterruptException e3) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", e3);
                    KafkaConsumer.LOG.info("Unsubscribing {} from topic {}", this.threadId, this.topicName);
                    this.consumer.unsubscribe();
                    Thread.currentThread().interrupt();
                    KafkaConsumer.LOG.debug("Closing {}", this.threadId);
                    IOHelper.close(this.consumer);
                } catch (Exception e4) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Error consuming " + this.threadId + " from kafka topic", e4);
                    KafkaConsumer.LOG.debug("Closing {}", this.threadId);
                    IOHelper.close(this.consumer);
                }
                return z;
            } catch (Throwable th) {
                KafkaConsumer.LOG.debug("Closing {}", this.threadId);
                IOHelper.close(this.consumer);
                throw th;
            }
        }

        private void commitOffset(StateRepository<String, String> stateRepository, TopicPartition topicPartition, long j, boolean z) {
            if (j != -1) {
                if (!KafkaConsumer.this.endpoint.getConfiguration().isAllowManualCommit() && stateRepository != null) {
                    KafkaConsumer.LOG.debug("Saving offset repository state {} from topic {} with offset: {}", new Object[]{this.threadId, this.topicName, Long.valueOf(j)});
                    stateRepository.setState(KafkaConsumer.this.serializeOffsetKey(topicPartition), KafkaConsumer.this.serializeOffsetValue(j));
                } else if (z) {
                    KafkaConsumer.LOG.debug("Forcing commitSync {} from topic {} with offset: {}", new Object[]{this.threadId, this.topicName, Long.valueOf(j)});
                    this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() {
            this.consumer.wakeup();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaConsumer.LOG.debug("onPartitionsRevoked: {} from topic {}", this.threadId, this.topicName);
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            for (TopicPartition topicPartition : collection) {
                String serializeOffsetKey = KafkaConsumer.this.serializeOffsetKey(topicPartition);
                Long l = this.lastProcessedOffset.get(serializeOffsetKey);
                if (l == null) {
                    l = -1L;
                }
                KafkaConsumer.LOG.debug("Saving offset repository state {} from offsetKey {} with offset: {}", new Object[]{this.threadId, serializeOffsetKey, l});
                commitOffset(offsetRepository, topicPartition, l.longValue(), true);
                this.lastProcessedOffset.remove(serializeOffsetKey);
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaConsumer.LOG.debug("onPartitionsAssigned: {} from topic {}", this.threadId, this.topicName);
            StateRepository<String, String> offsetRepository = KafkaConsumer.this.endpoint.getConfiguration().getOffsetRepository();
            if (offsetRepository != null) {
                for (TopicPartition topicPartition : collection) {
                    String str = (String) offsetRepository.getState(KafkaConsumer.this.serializeOffsetKey(topicPartition));
                    if (str != null && !str.isEmpty()) {
                        long deserializeOffsetValue = KafkaConsumer.this.deserializeOffsetValue(str) + 1;
                        KafkaConsumer.LOG.debug("Resuming partition {} from offset {} from state", Integer.valueOf(topicPartition.partition()), Long.valueOf(deserializeOffsetValue));
                        this.consumer.seek(topicPartition, deserializeOffsetValue);
                    }
                }
            }
        }
    }

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.tasks = new ArrayList();
        this.endpoint = kafkaEndpoint;
        this.processor = processor;
        this.pollTimeoutMs = kafkaEndpoint.getConfiguration().getPollTimeoutMs();
        if (ObjectHelper.isEmpty(kafkaEndpoint.getConfiguration().getBrokers())) {
            throw new IllegalArgumentException("Brokers must be configured");
        }
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public KafkaEndpoint m3getEndpoint() {
        return super.getEndpoint();
    }

    Properties getProps() {
        Properties createConsumerProperties = this.endpoint.getConfiguration().createConsumerProperties();
        this.endpoint.updateClassProperties(createConsumerProperties);
        String brokers = this.endpoint.getConfiguration().getBrokers();
        if (brokers == null) {
            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
        }
        createConsumerProperties.put("bootstrap.servers", brokers);
        if (this.endpoint.getConfiguration().getGroupId() != null) {
            String groupId = this.endpoint.getConfiguration().getGroupId();
            createConsumerProperties.put("group.id", groupId);
            LOG.debug("Kafka consumer groupId is {}", groupId);
        } else {
            String uuid = UUID.randomUUID().toString();
            createConsumerProperties.put("group.id", uuid);
            LOG.debug("Kafka consumer groupId is {} (generated)", uuid);
        }
        return createConsumerProperties;
    }

    protected void doStart() throws Exception {
        LOG.info("Starting Kafka consumer on topic: {} with breakOnFirstError: {}", this.endpoint.getConfiguration().getTopic(), Boolean.valueOf(this.endpoint.getConfiguration().isBreakOnFirstError()));
        super.doStart();
        ServiceSupport offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
        if ((offsetRepository instanceof ServiceSupport) && !offsetRepository.isStarted()) {
            this.stopOffsetRepo = true;
            LOG.debug("Starting OffsetRepository: {}", offsetRepository);
            ServiceHelper.startService(this.endpoint.getConfiguration().getOffsetRepository());
        }
        this.executor = this.endpoint.createExecutor();
        String topic = this.endpoint.getConfiguration().getTopic();
        Pattern compile = this.endpoint.getConfiguration().isTopicIsPattern() ? Pattern.compile(topic) : null;
        for (int i = 0; i < this.endpoint.getConfiguration().getConsumersCount(); i++) {
            KafkaFetchRecords kafkaFetchRecords = new KafkaFetchRecords(topic, compile, i + "", getProps());
            kafkaFetchRecords.preInit();
            this.executor.submit(kafkaFetchRecords);
            this.tasks.add(kafkaFetchRecords);
        }
    }

    protected void doStop() throws Exception {
        LOG.info("Stopping Kafka consumer on topic: {}", this.endpoint.getConfiguration().getTopic());
        if (this.executor != null) {
            if (m3getEndpoint() == null || m3getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                int shutdownTimeout = m3getEndpoint().getConfiguration().getShutdownTimeout();
                LOG.debug("Shutting down Kafka consumer worker threads with timeout {} millis", Integer.valueOf(shutdownTimeout));
                m3getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor, shutdownTimeout);
            }
            if (!this.executor.isTerminated()) {
                this.tasks.forEach(obj -> {
                    ((KafkaFetchRecords) obj).shutdown();
                });
                this.executor.shutdownNow();
            }
        }
        this.tasks.clear();
        this.executor = null;
        if (this.stopOffsetRepo) {
            StateRepository<String, String> offsetRepository = this.endpoint.getConfiguration().getOffsetRepository();
            LOG.debug("Stopping OffsetRepository: {}", offsetRepository);
            ServiceHelper.stopAndShutdownService(offsetRepository);
        }
        super.doStop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void propagateHeaders(ConsumerRecord<Object, Object> consumerRecord, Exchange exchange, KafkaConfiguration kafkaConfiguration) {
        HeaderFilterStrategy headerFilterStrategy = kafkaConfiguration.getHeaderFilterStrategy();
        KafkaHeaderDeserializer kafkaHeaderDeserializer = kafkaConfiguration.getKafkaHeaderDeserializer();
        StreamSupport.stream(consumerRecord.headers().spliterator(), false).filter(header -> {
            return shouldBeFiltered(header, exchange, headerFilterStrategy);
        }).forEach(header2 -> {
            exchange.getIn().setHeader(header2.key(), kafkaHeaderDeserializer.deserialize(header2.key(), header2.value()));
        });
    }

    private boolean shouldBeFiltered(Header header, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
        return !headerFilterStrategy.applyFilterToCamelHeaders(header.key(), header.value(), exchange);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAutoCommitEnabled() {
        return this.endpoint.getConfiguration().getAutoCommitEnable() != null && this.endpoint.getConfiguration().getAutoCommitEnable().booleanValue();
    }

    protected String serializeOffsetKey(TopicPartition topicPartition) {
        return topicPartition.topic() + '/' + topicPartition.partition();
    }

    protected String serializeOffsetValue(long j) {
        return String.valueOf(j);
    }

    protected long deserializeOffsetValue(String str) {
        return Long.parseLong(str);
    }
}
