package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer.class */
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final ConsumerFactory<K, V> consumerFactory;
    private final TopicPartitionInitialOffset[] topicPartitions;
    private KafkaMessageListenerContainer<K, V>.ListenerConsumer listenerConsumer;
    private ListenableFuture<?> listenerConsumerFuture;
    private MessageListener<K, V> listener;
    private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer.class */
    public final class ListenerConsumer implements SchedulingAwareRunnable {
        private final Log logger;
        private final ContainerProperties containerProperties;
        private final OffsetCommitCallback commitCallback;
        private final Consumer<K, V> consumer;
        private final Map<String, Map<Integer, Long>> offsets;
        private final MessageListener<K, V> listener;
        private final AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
        private final boolean autoCommit;
        private final boolean isManualAck;
        private final boolean isManualImmediateAck;
        private final boolean isAnyManualAck;
        private final boolean isRecordAck;
        private final boolean isBatchAck;
        private final BlockingQueue<ConsumerRecords<K, V>> recordsToProcess;
        private final BlockingQueue<ConsumerRecord<K, V>> acks;
        private final ApplicationEventPublisher applicationEventPublisher;
        private volatile Map<TopicPartition, Long> definedPartitions;
        private ConsumerRecords<K, V> unsent;
        private volatile Collection<TopicPartition> assignedPartitions;
        private int count;
        private volatile KafkaMessageListenerContainer<K, V>.ListenerConsumer.ListenerInvoker invoker;
        private long last;
        private volatile Future<?> listenerInvokerFuture;
        private boolean paused;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ConsumerAcknowledgment.class */
        public final class ConsumerAcknowledgment implements Acknowledgment {
            private final ConsumerRecord<K, V> record;
            private final boolean immediate;

            private ConsumerAcknowledgment(ConsumerRecord<K, V> consumerRecord, boolean z) {
                this.record = consumerRecord;
                this.immediate = z;
            }

            @Override // org.springframework.kafka.support.Acknowledgment
            public void acknowledge() {
                try {
                    ListenerConsumer.this.acks.put(this.record);
                    if (this.immediate) {
                        ListenerConsumer.this.consumer.wakeup();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while queuing ack for " + this.record, e);
                }
            }

            public String toString() {
                return "Acknowledgment for " + this.record;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.class */
        public final class ListenerInvoker implements SchedulingAwareRunnable {
            private final CountDownLatch exitLatch;
            private volatile boolean active;
            private volatile Thread executingThread;

            private ListenerInvoker() {
                this.exitLatch = new CountDownLatch(1);
                this.active = true;
            }

            public void run() {
                Assert.isTrue(this.active, "This instance is not active anymore");
                try {
                    this.executingThread = Thread.currentThread();
                    while (this.active) {
                        try {
                            ConsumerRecords consumerRecords = (ConsumerRecords) ListenerConsumer.this.recordsToProcess.poll(1L, TimeUnit.SECONDS);
                            if (this.active) {
                                if (consumerRecords != null) {
                                    ListenerConsumer.this.invokeListener(consumerRecords);
                                } else if (ListenerConsumer.this.logger.isTraceEnabled()) {
                                    ListenerConsumer.this.logger.trace("No records to process");
                                }
                            }
                        } catch (InterruptedException e) {
                            if (this.active) {
                                ListenerConsumer.this.logger.debug("Interrupt ignored");
                            } else {
                                Thread.currentThread().interrupt();
                            }
                        }
                        if (!ListenerConsumer.this.isManualImmediateAck && this.active) {
                            ListenerConsumer.this.consumer.wakeup();
                        }
                    }
                } finally {
                    this.active = false;
                    this.exitLatch.countDown();
                }
            }

            public boolean isLongLived() {
                return true;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void stop() {
                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                    ListenerConsumer.this.logger.debug("Stopping invoker");
                }
                this.active = false;
                try {
                    if (!this.exitLatch.await(KafkaMessageListenerContainer.this.getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) && this.executingThread != null) {
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug("Interrupting invoker");
                        }
                        this.executingThread.interrupt();
                    }
                } catch (InterruptedException e) {
                    if (this.executingThread != null) {
                        this.executingThread.interrupt();
                    }
                    Thread.currentThread().interrupt();
                }
                if (ListenerConsumer.this.logger.isDebugEnabled()) {
                    ListenerConsumer.this.logger.debug("Invoker stopped");
                }
            }
        }

        private ListenerConsumer(MessageListener<K, V> messageListener, AcknowledgingMessageListener<K, V> acknowledgingMessageListener) {
            this.logger = LogFactory.getLog(ListenerConsumer.class);
            this.containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
            this.commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.offsets = new HashMap();
            this.autoCommit = KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            this.isManualAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL);
            this.isManualImmediateAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
            this.isRecordAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.RECORD);
            this.isBatchAck = this.containerProperties.getAckMode().equals(AbstractMessageListenerContainer.AckMode.BATCH);
            this.recordsToProcess = new LinkedBlockingQueue(this.containerProperties.getQueueDepth());
            this.acks = new LinkedBlockingQueue();
            this.applicationEventPublisher = KafkaMessageListenerContainer.this.getApplicationEventPublisher();
            Assert.state((this.isAnyManualAck && this.autoCommit) ? false : true, "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
            final Consumer<K, V> createConsumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
            ConsumerRebalanceListener consumerRebalanceListener = new ConsumerRebalanceListener() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    if (!ListenerConsumer.this.autoCommit) {
                        if (ListenerConsumer.this.logger.isTraceEnabled()) {
                            ListenerConsumer.this.logger.trace("Received partition revocation notification, and will stop the invoker.");
                        }
                        if (ListenerConsumer.this.listenerInvokerFuture != null) {
                            ListenerConsumer.this.stopInvokerAndCommitManualAcks();
                            ListenerConsumer.this.recordsToProcess.clear();
                            ListenerConsumer.this.unsent = null;
                        } else if (!CollectionUtils.isEmpty(collection)) {
                            ListenerConsumer.this.logger.error("Invalid state: the invoker was not active, but the consumer had allocated partitions");
                        }
                    } else if (ListenerConsumer.this.logger.isTraceEnabled()) {
                        ListenerConsumer.this.logger.trace("Received partition revocation notification, but the container is in autocommit mode, so transition will be handled by the consumer");
                    }
                    KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(collection);
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    ListenerConsumer.this.assignedPartitions = collection;
                    if (!ListenerConsumer.this.autoCommit) {
                        HashMap hashMap = new HashMap();
                        for (TopicPartition topicPartition : collection) {
                            hashMap.put(topicPartition, new OffsetAndMetadata(createConsumer.position(topicPartition)));
                        }
                        if (ListenerConsumer.this.logger.isDebugEnabled()) {
                            ListenerConsumer.this.logger.debug("Committing: " + hashMap);
                        }
                        if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
                            ListenerConsumer.this.consumer.commitSync(hashMap);
                        } else {
                            ListenerConsumer.this.consumer.commitAsync(hashMap, KafkaMessageListenerContainer.this.getContainerProperties().getCommitCallback());
                        }
                    }
                    if (!ListenerConsumer.this.autoCommit && KafkaMessageListenerContainer.this.isRunning() && !CollectionUtils.isEmpty(collection)) {
                        ListenerConsumer.this.startInvoker();
                    }
                    KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener().onPartitionsAssigned(collection);
                }
            };
            if (KafkaMessageListenerContainer.this.topicPartitions != null) {
                List<TopicPartitionInitialOffset> asList = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap(asList.size());
                for (TopicPartitionInitialOffset topicPartitionInitialOffset : asList) {
                    this.definedPartitions.put(topicPartitionInitialOffset.topicPartition(), topicPartitionInitialOffset.initialOffset());
                }
                createConsumer.assign(new ArrayList(this.definedPartitions.keySet()));
            } else if (this.containerProperties.getTopicPattern() != null) {
                createConsumer.subscribe(this.containerProperties.getTopicPattern(), consumerRebalanceListener);
            } else {
                createConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), consumerRebalanceListener);
            }
            this.consumer = createConsumer;
            this.listener = messageListener;
            this.acknowledgingMessageListener = acknowledgingMessageListener;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void startInvoker() {
            this.invoker = new ListenerInvoker();
            this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor().submit(this.invoker);
        }

        public boolean isLongLived() {
            return true;
        }

        public void run() {
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                initPartitionsIfNeeded();
                if (!this.autoCommit) {
                    startInvoker();
                }
            }
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis;
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    if (!this.autoCommit) {
                        processCommits();
                    }
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Polling (paused=" + this.paused + ")...");
                    }
                    ConsumerRecords<K, V> poll = this.consumer.poll(this.containerProperties.getPollTimeout());
                    if (poll != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + poll.count() + " records");
                    }
                    if (poll != null && poll.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            currentTimeMillis = System.currentTimeMillis();
                        }
                        if (this.autoCommit) {
                            invokeListener(poll);
                        } else if (sendToListener(poll) && this.assignedPartitions != null) {
                            this.consumer.pause((TopicPartition[]) this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));
                            this.paused = true;
                            this.unsent = poll;
                        }
                    } else if (this.containerProperties.getIdleEventInterval() != null) {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 > currentTimeMillis + this.containerProperties.getIdleEventInterval().longValue() && currentTimeMillis2 > j + this.containerProperties.getIdleEventInterval().longValue()) {
                            publishIdleContainerEvent(currentTimeMillis2 - currentTimeMillis);
                            j = currentTimeMillis2;
                        }
                    }
                    this.unsent = checkPause(this.unsent);
                } catch (WakeupException e) {
                    this.unsent = checkPause(this.unsent);
                } catch (Exception e2) {
                    if (this.containerProperties.getErrorHandler() != null) {
                        this.containerProperties.getErrorHandler().handle(e2, null);
                    } else {
                        this.logger.error("Container exception", e2);
                    }
                }
            }
            if (this.listenerInvokerFuture != null) {
                stopInvokerAndCommitManualAcks();
            }
            try {
                this.consumer.unsubscribe();
            } catch (WakeupException e3) {
            }
            this.consumer.close();
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Consumer stopped");
            }
        }

        private void publishIdleContainerEvent(long j) {
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(new ListenerContainerIdleEvent(KafkaMessageListenerContainer.this, j, KafkaMessageListenerContainer.this.getBeanName(), KafkaMessageListenerContainer.this.getAssignedPartitions()));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stopInvokerAndCommitManualAcks() {
            long currentTimeMillis = System.currentTimeMillis();
            this.invoker.stop();
            try {
                try {
                    try {
                        this.listenerInvokerFuture.get((this.containerProperties.getShutdownTimeout() + currentTimeMillis) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                        this.listenerInvokerFuture = null;
                    } catch (ExecutionException e) {
                        this.logger.error("Error while shutting down the listener invoker:", e);
                        this.listenerInvokerFuture = null;
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    this.listenerInvokerFuture = null;
                } catch (TimeoutException e3) {
                    this.logger.info("Invoker timed out while waiting for shutdown and will be canceled.");
                    this.listenerInvokerFuture.cancel(true);
                    this.listenerInvokerFuture = null;
                }
                processCommits();
                if (this.offsets.size() > 0) {
                    commitIfNecessary();
                }
                this.invoker = null;
            } catch (Throwable th) {
                this.listenerInvokerFuture = null;
                throw th;
            }
        }

        private ConsumerRecords<K, V> checkPause(ConsumerRecords<K, V> consumerRecords) {
            if (!this.paused || this.recordsToProcess.size() >= this.containerProperties.getQueueDepth()) {
                return consumerRecords;
            }
            this.consumer.resume((TopicPartition[]) this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));
            this.paused = false;
            if (consumerRecords == null) {
                return null;
            }
            try {
                sendToListener(consumerRecords);
                return null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted while sending to listener", e);
            }
        }

        private boolean sendToListener(ConsumerRecords<K, V> consumerRecords) throws InterruptedException {
            if (this.containerProperties.isPauseEnabled() && CollectionUtils.isEmpty(this.definedPartitions)) {
                return !this.recordsToProcess.offer(consumerRecords, this.containerProperties.getPauseAfter(), TimeUnit.MILLISECONDS);
            }
            this.recordsToProcess.put(consumerRecords);
            return false;
        }

        private void handleAcks() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Ack: " + consumerRecord);
                }
                processAck(consumerRecord);
                poll = this.acks.poll();
            }
        }

        private void processAck(ConsumerRecord<K, V> consumerRecord) {
            if (!this.isManualImmediateAck) {
                addOffset(consumerRecord);
            } else {
                try {
                    ackImmediate(consumerRecord);
                } catch (WakeupException e) {
                }
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> consumerRecord) {
            Map singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Committing: " + singletonMap);
            }
            if (this.containerProperties.isSyncCommits()) {
                this.consumer.commitSync(singletonMap);
            } else {
                this.consumer.commitAsync(singletonMap, this.commitCallback);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void invokeListener(ConsumerRecords<K, V> consumerRecords) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext() && (this.autoCommit || (this.invoker != null && ((ListenerInvoker) this.invoker).active))) {
                ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Processing " + consumerRecord);
                }
                try {
                    if (this.acknowledgingMessageListener != null) {
                        this.acknowledgingMessageListener.onMessage(consumerRecord, new ConsumerAcknowledgment(consumerRecord, this.isManualImmediateAck));
                    } else {
                        this.listener.onMessage(consumerRecord);
                    }
                    if (!this.isAnyManualAck) {
                        this.acks.add(consumerRecord);
                    }
                    if (this.isRecordAck) {
                        this.consumer.wakeup();
                    }
                } catch (Exception e) {
                    if (this.containerProperties.isAckOnError()) {
                        this.acks.add(consumerRecord);
                    }
                    if (this.containerProperties.getErrorHandler() != null) {
                        this.containerProperties.getErrorHandler().handle(e, consumerRecord);
                    } else {
                        this.logger.error("Listener threw an exception and no error handler for " + consumerRecord, e);
                    }
                }
            }
            if (this.isManualAck || this.isBatchAck) {
                this.consumer.wakeup();
            }
        }

        private void processCommits() {
            handleAcks();
            this.count += this.acks.size();
            AbstractMessageListenerContainer.AckMode ackMode = this.containerProperties.getAckMode();
            if (this.isManualImmediateAck) {
                return;
            }
            if (!this.isManualAck) {
                updatePendingOffsets();
            }
            boolean z = this.count >= this.containerProperties.getAckCount();
            if (this.isManualAck || this.isBatchAck || this.isRecordAck || (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) && z)) {
                if (this.logger.isDebugEnabled() && ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT)) {
                    this.logger.debug("Committing in AckMode.COUNT because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount());
                }
                commitIfNecessary();
                this.count = 0;
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            boolean z2 = currentTimeMillis - this.last > this.containerProperties.getAckTime();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) && z2) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Committing in AckMode.TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                }
                commitIfNecessary();
                this.last = currentTimeMillis;
                return;
            }
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                if (z2 || z) {
                    if (this.logger.isDebugEnabled()) {
                        if (z2) {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                        } else {
                            this.logger.debug("Committing in AckMode.COUNT_TIME because count " + this.count + " exceeds configured limit of" + this.containerProperties.getAckCount());
                        }
                    }
                    commitIfNecessary();
                    this.last = currentTimeMillis;
                    this.count = 0;
                }
            }
        }

        private void initPartitionsIfNeeded() {
            for (Map.Entry<TopicPartition, Long> entry : this.definedPartitions.entrySet()) {
                TopicPartition key = entry.getKey();
                Long value = entry.getValue();
                if (value != null) {
                    long longValue = value.longValue();
                    if (value.longValue() < 0) {
                        this.consumer.seekToEnd(new TopicPartition[]{key});
                        longValue = Math.max(0L, this.consumer.position(key) + value.longValue());
                    }
                    try {
                        this.consumer.seek(key, longValue);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Reset " + key + " to offset " + longValue);
                        }
                    } catch (Exception e) {
                        this.logger.error("Failed to set initial offset for " + key + " at " + longValue + ". Positioned to " + this.consumer.position(key), e);
                    }
                }
            }
        }

        private void updatePendingOffsets() {
            ConsumerRecord<K, V> poll = this.acks.poll();
            while (true) {
                ConsumerRecord<K, V> consumerRecord = poll;
                if (consumerRecord == null) {
                    return;
                }
                addOffset(consumerRecord);
                poll = this.acks.poll();
            }
        }

        private void addOffset(ConsumerRecord<K, V> consumerRecord) {
            if (!this.offsets.containsKey(consumerRecord.topic())) {
                this.offsets.put(consumerRecord.topic(), new HashMap());
            }
            this.offsets.get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
        }

        private void commitIfNecessary() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
                for (Map.Entry<Integer, Long> entry2 : entry.getValue().entrySet()) {
                    hashMap.put(new TopicPartition(entry.getKey(), entry2.getKey().intValue()), new OffsetAndMetadata(entry2.getValue().longValue() + 1));
                }
            }
            this.offsets.clear();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Commit list: " + hashMap);
            }
            if (hashMap.isEmpty()) {
                return;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Committing: " + hashMap);
            }
            try {
                if (this.containerProperties.isSyncCommits()) {
                    this.consumer.commitSync(hashMap);
                } else {
                    this.consumer.commitAsync(hashMap, this.commitCallback);
                }
            } catch (WakeupException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Woken up during commit");
                }
            }
        }
    }

    /* loaded from: input_file:org/springframework/kafka/listener/KafkaMessageListenerContainer$LoggingCommitCallback.class */
    private static final class LoggingCommitCallback implements OffsetCommitCallback {
        private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

        private LoggingCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            if (exc != null) {
                logger.error("Commit failed for " + map, exc);
            } else if (logger.isDebugEnabled()) {
                logger.debug("Commits for " + map + " completed");
            }
        }
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        this(consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

    public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitionInitialOffsetArr) {
        super(containerProperties);
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
        this.consumerFactory = consumerFactory;
        if (topicPartitionInitialOffsetArr != null) {
            this.topicPartitions = (TopicPartitionInitialOffset[]) Arrays.copyOf(topicPartitionInitialOffsetArr, topicPartitionInitialOffsetArr.length);
        } else {
            this.topicPartitions = containerProperties.getTopicPartitions();
        }
    }

    public Collection<TopicPartition> getAssignedPartitions() {
        if (((ListenerConsumer) this.listenerConsumer).definedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).definedPartitions.keySet());
        }
        if (((ListenerConsumer) this.listenerConsumer).assignedPartitions != null) {
            return Collections.unmodifiableCollection(((ListenerConsumer) this.listenerConsumer).assignedPartitions);
        }
        return null;
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AbstractMessageListenerContainer.AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AbstractMessageListenerContainer.AckMode.TIME) || ackMode.equals(AbstractMessageListenerContainer.AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000L);
            }
        }
        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener) messageListener;
        } else {
            if (!(messageListener instanceof MessageListener)) {
                throw new IllegalStateException("messageListener must be 'MessageListener' or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
            }
            this.listener = (MessageListener) messageListener;
        }
        if (containerProperties.getConsumerTaskExecutor() == null) {
            containerProperties.setConsumerTaskExecutor(new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-kafka-consumer-"));
        }
        if (containerProperties.getListenerTaskExecutor() == null) {
            containerProperties.setListenerTaskExecutor(new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-kafka-listener-"));
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(final Runnable runnable) {
        if (isRunning()) {
            this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() { // from class: org.springframework.kafka.listener.KafkaMessageListenerContainer.1
                public void onFailure(Throwable th) {
                    KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", th);
                    if (runnable != null) {
                        runnable.run();
                    }
                }

                public void onSuccess(Object obj) {
                    if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {
                        KafkaMessageListenerContainer.this.logger.debug(KafkaMessageListenerContainer.this + " stopped normally");
                    }
                    if (runnable != null) {
                        runnable.run();
                    }
                }
            });
            setRunning(false);
            ((ListenerConsumer) this.listenerConsumer).consumer.wakeup();
        }
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + getBeanName() + ", topicPartitions=" + getAssignedPartitions() + "]";
    }
}
