/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
import org.springframework.kafka.event.ConsumerPartitionPausedEvent;
import org.springframework.kafka.event.ConsumerPartitionResumedEvent;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppingEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
import org.springframework.kafka.event.ListenerContainerPartitionNoLongerIdleEvent;
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerPauseResumeEventPublisher;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.DefaultListenerMetadata;
import org.springframework.kafka.listener.DelegatingMessageListener;
import org.springframework.kafka.listener.DeliveryAttemptAware;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.ListenerMetadata;
import org.springframework.kafka.listener.ListenerType;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.listener.LoggingCommitCallback;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.OffsetAndMetadataProvider;
import org.springframework.kafka.listener.RecordInRetryException;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.listener.ThreadStateProcessor;
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation;
import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext;
import org.springframework.kafka.support.micrometer.MicrometerHolder;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V>
implements ConsumerPauseResumeEventPublisher {
    private static final String UNUSED = "unused";
    private static final String UNCHECKED = "unchecked";
    private static final String RAWTYPES = "rawtypes";
    private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();
    private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;
    private final TopicPartitionOffset[] topicPartitions;
    private String clientIdSuffix;
    private Runnable emergencyStop = () -> this.stopAbnormally(() -> {});
    private volatile ListenerConsumer listenerConsumer;
    private volatile CompletableFuture<Void> listenerConsumerFuture;
    private volatile CountDownLatch startLatch = new CountDownLatch(1);

    public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        this(null, consumerFactory, containerProperties, null);
    }

    KafkaMessageListenerContainer(AbstractMessageListenerContainer<K, V> container, ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        this(container, consumerFactory, containerProperties, null);
    }

    KafkaMessageListenerContainer(@Nullable AbstractMessageListenerContainer<K, V> container, ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset ... topicPartitions) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
        this.thisOrParentContainer = container == null ? this : container;
        this.topicPartitions = topicPartitions != null ? Arrays.copyOf(topicPartitions, topicPartitions.length) : containerProperties.getTopicPartitions();
    }

    public void setEmergencyStop(Runnable emergencyStop) {
        Assert.notNull((Object)emergencyStop, (String)"'emergencyStop' cannot be null");
        this.emergencyStop = emergencyStop;
    }

    public void setClientIdSuffix(String clientIdSuffix) {
        this.clientIdSuffix = clientIdSuffix;
    }

    @Override
    @Nullable
    public Collection<TopicPartition> getAssignedPartitions() {
        ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
        if (partitionsListenerConsumer != null) {
            if (partitionsListenerConsumer.definedPartitions != null) {
                return Collections.unmodifiableCollection(partitionsListenerConsumer.definedPartitions.keySet());
            }
            if (partitionsListenerConsumer.assignedPartitions != null) {
                return Collections.unmodifiableCollection(partitionsListenerConsumer.assignedPartitions);
            }
        }
        return null;
    }

    @Override
    @Nullable
    public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
        ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
        if (partitionsListenerConsumer != null) {
            return Collections.singletonMap(partitionsListenerConsumer.getClientId(), this.getAssignedPartitions());
        }
        return null;
    }

    @Override
    public boolean isContainerPaused() {
        return this.isPauseRequested() && this.listenerConsumer != null && this.listenerConsumer.isConsumerPaused();
    }

    @Override
    public boolean isPartitionPaused(TopicPartition topicPartition) {
        return this.listenerConsumer != null && this.listenerConsumer.isPartitionPaused(topicPartition);
    }

    @Override
    public boolean isInExpectedState() {
        return this.isRunning() || this.isStoppedNormally();
    }

    @Override
    public void enforceRebalance() {
        this.thisOrParentContainer.enforceRebalanceRequested.set(true);
        this.consumerWakeIfNecessary();
    }

    @Override
    public void pause() {
        super.pause();
        this.consumerWakeIfNecessary();
    }

    @Override
    public void resume() {
        super.resume();
        this.consumerWakeIfNecessary();
    }

    @Override
    public void resumePartition(TopicPartition topicPartition) {
        super.resumePartition(topicPartition);
        this.consumerWakeIfNecessary();
    }

    private void consumerWakeIfNecessary() {
        ListenerConsumer consumer = this.listenerConsumer;
        if (consumer != null) {
            consumer.wakeIfNecessary();
        }
    }

    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
        if (listenerConsumerForMetrics != null) {
            Map metrics = listenerConsumerForMetrics.consumer.metrics();
            return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics);
        }
        return Collections.emptyMap();
    }

    @Override
    protected void doStart() {
        ObservationRegistry reg;
        ApplicationContext applicationContext;
        if (this.isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) {
            this.checkTopics();
        }
        ContainerProperties containerProperties = this.getContainerProperties();
        Object messageListener = containerProperties.getMessageListener();
        AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setListenerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener listener = (GenericMessageListener)messageListener;
        ListenerType listenerType = this.determineListenerType(listener);
        ObservationRegistry observationRegistry = containerProperties.getObservationRegistry();
        if (observationRegistry.isNoop() && (applicationContext = this.getApplicationContext()) != null && containerProperties.isObservationEnabled() && (reg = (ObservationRegistry)applicationContext.getBeanProvider(ObservationRegistry.class).getIfUnique()) != null) {
            observationRegistry = reg;
        }
        this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
        this.setRunning(true);
        this.startLatch = new CountDownLatch(1);
        this.listenerConsumerFuture = consumerExecutor.submitCompletable((Runnable)((Object)this.listenerConsumer));
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error((CharSequence)"Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                this.publishConsumerFailedToStart();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private ListenerType determineListenerType(GenericMessageListener<?> listener) {
        GenericMessageListener<Object> delegating = listener;
        while (delegating instanceof DelegatingMessageListener) {
            DelegatingMessageListener dml = (DelegatingMessageListener)((Object)delegating);
            delegating = dml.getDelegate();
        }
        return ListenerUtils.determineListenerType(delegating);
    }

    @Override
    protected void doStop(Runnable callback, boolean normal) {
        if (this.isRunning()) {
            this.listenerConsumerFuture.whenComplete((BiConsumer)new StopCallback(callback));
            this.setRunning(false);
            this.listenerConsumer.wakeIfNecessaryForStop();
            this.setStoppedNormally(normal);
        }
    }

    @Override
    public void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
        if (reason.equals((Object)ConsumerStoppedEvent.Reason.AUTH) && child.equals(this) && this.getContainerProperties().isRestartAfterAuthExceptions()) {
            this.setStoppedNormally(true);
            this.start();
        }
    }

    private void publishIdlePartitionEvent(long idleTime, TopicPartition topicPartition, Consumer<K, V> consumer, boolean paused) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ListenerContainerPartitionIdleEvent(this, this.thisOrParentContainer, idleTime, this.getBeanName(), topicPartition, consumer, paused));
        }
    }

    private void publishNoLongerIdlePartitionEvent(long idleTime, Consumer<K, V> consumer, TopicPartition topicPartition) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ListenerContainerPartitionNoLongerIdleEvent(this, this.thisOrParentContainer, idleTime, this.getBeanName(), topicPartition, consumer));
        }
    }

    private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer, boolean paused) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ListenerContainerIdleEvent(this, this.thisOrParentContainer, idleTime, this.getBeanName(), this.getAssignedPartitions(), consumer, paused));
        }
    }

    private void publishNoLongerIdleContainerEvent(long idleTime, Consumer<?, ?> consumer) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ListenerContainerNoLongerIdleEvent(this, this.thisOrParentContainer, idleTime, this.getBeanName(), this.getAssignedPartitions(), consumer));
        }
    }

    private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new NonResponsiveConsumerEvent(this, this.thisOrParentContainer, timeSinceLastPoll, this.getBeanName(), this.getAssignedPartitions(), consumer));
        }
    }

    @Override
    public void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerPausedEvent(this, this.thisOrParentContainer, Collections.unmodifiableCollection(partitions), reason));
        }
    }

    @Override
    public void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerResumedEvent(this, this.thisOrParentContainer, Collections.unmodifiableCollection(partitions)));
        }
    }

    private void publishConsumerPartitionPausedEvent(TopicPartition partition) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerPartitionPausedEvent(this, this.thisOrParentContainer, partition));
        }
    }

    private void publishConsumerPartitionResumedEvent(TopicPartition partition) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerPartitionResumedEvent(this, this.thisOrParentContainer, partition));
        }
    }

    private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
        try {
            ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
            if (publisher != null) {
                publisher.publishEvent((ApplicationEvent)new ConsumerStoppingEvent(this, this.thisOrParentContainer, consumer, this.getAssignedPartitions()));
            }
        }
        catch (Exception e) {
            this.logger.error((Throwable)e, (CharSequence)"Failed to publish consumer stopping event");
        }
    }

    private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            ConsumerStoppedEvent.Reason reason = throwable instanceof Error ? ConsumerStoppedEvent.Reason.ERROR : (throwable instanceof StopAfterFenceException || throwable instanceof FencedInstanceIdException ? ConsumerStoppedEvent.Reason.FENCED : (throwable instanceof AuthenticationException || throwable instanceof AuthorizationException ? ConsumerStoppedEvent.Reason.AUTH : (throwable instanceof NoOffsetForPartitionException ? ConsumerStoppedEvent.Reason.NO_OFFSET : ConsumerStoppedEvent.Reason.NORMAL)));
            publisher.publishEvent((ApplicationEvent)new ConsumerStoppedEvent(this, this.thisOrParentContainer, reason));
            this.thisOrParentContainer.childStopped(this, reason);
        }
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishConsumerFailedToStart() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerFailedToStartEvent(this, this.thisOrParentContainer));
        }
    }

    private void publishRetryAuthEvent(Throwable throwable) {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            ConsumerRetryAuthEvent.Reason reason;
            if (throwable instanceof AuthenticationException) {
                reason = ConsumerRetryAuthEvent.Reason.AUTHENTICATION;
            } else if (throwable instanceof AuthorizationException) {
                reason = ConsumerRetryAuthEvent.Reason.AUTHORIZATION;
            } else {
                throw new IllegalArgumentException("Only Authentication or Authorization Exceptions are allowed", throwable);
            }
            publisher.publishEvent((ApplicationEvent)new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason));
        }
    }

    private void publishRetryAuthSuccessfulEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerRetryAuthSuccessfulEvent(this, this.thisOrParentContainer));
        }
    }

    @Override
    protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
        return this.thisOrParentContainer;
    }

    public String toString() {
        return "KafkaMessageListenerContainer [id=" + this.getBeanName() + (String)(this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "") + ", topicPartitions=" + String.valueOf(this.getAssignedPartitions() == null ? "none assigned" : this.getAssignedPartitions()) + "]";
    }

    private final class ListenerConsumer
    implements SchedulingAwareRunnable,
    ConsumerSeekAware.ConsumerSeekCallback {
        private static final String COMMITTING = "Committing: ";
        private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception";
        private final LogAccessor logger;
        private final ContainerProperties containerProperties;
        private final OffsetCommitCallback commitCallback;
        private final OffsetAndMetadataProvider offsetAndMetadataProvider;
        private final ListenerMetadata listenerMetadata;
        private final Consumer<K, V> consumer;
        private final Map<TopicPartition, Long> offsets;
        private final Collection<TopicPartition> assignedPartitions;
        private final Map<TopicPartition, OffsetAndMetadata> lastCommits;
        private final Map<TopicPartition, Long> savedPositions;
        private final GenericMessageListener<?> genericListener;
        private final ConsumerSeekAware consumerSeekAwareListener;
        private final MessageListener<K, V> listener;
        private final BatchMessageListener<K, V> batchListener;
        private final ListenerType listenerType;
        private final boolean isConsumerAwareListener;
        private final boolean isBatchListener;
        private final boolean wantsFullRecords;
        private final boolean wantsBatchRecoverAfterRollback;
        private final boolean asyncReplies;
        private final boolean autoCommit;
        private final ContainerProperties.AckMode ackMode;
        private final boolean isManualAck;
        private final boolean isCountAck;
        private final boolean isTimeOnlyAck;
        private final boolean isTimeAck;
        private final boolean isManualImmediateAck;
        private final boolean isAnyManualAck;
        private final boolean isRecordAck;
        private final BlockingQueue<ConsumerRecord<K, V>> acks;
        private final BlockingQueue<TopicPartitionOffset> seeks;
        private final CommonErrorHandler commonErrorHandler;
        @Deprecated(since="3.2", forRemoval=true)
        private final PlatformTransactionManager transactionManager;
        private final KafkaAwareTransactionManager<?, ?> kafkaTxManager;
        private final TransactionTemplate transactionTemplate;
        private final String consumerGroupId;
        private final TaskScheduler taskScheduler;
        private final ScheduledFuture<?> monitorTask;
        private final LogIfLevelEnabled commitLogger;
        private final Duration pollTimeout;
        private final Duration pollTimeoutWhilePaused;
        private final boolean checkNullKeyForExceptions;
        private final boolean checkNullValueForExceptions;
        private final boolean syncCommits;
        private final Duration syncCommitTimeout;
        private final RecordInterceptor<K, V> recordInterceptor;
        private final RecordInterceptor<K, V> earlyRecordInterceptor;
        private final RecordInterceptor<K, V> commonRecordInterceptor;
        private final BatchInterceptor<K, V> batchInterceptor;
        private final BatchInterceptor<K, V> earlyBatchInterceptor;
        private final BatchInterceptor<K, V> commonBatchInterceptor;
        private final ThreadStateProcessor pollThreadStateProcessor;
        private final ConsumerSeekAware.ConsumerSeekCallback seekCallback;
        private final long maxPollInterval;
        private final MicrometerHolder micrometerHolder;
        private final boolean observationEnabled;
        private final AtomicBoolean polling;
        private final boolean subBatchPerPartition;
        private final Duration authExceptionRetryInterval;
        private final ContainerProperties.AssignmentCommitOption autoCommitOption;
        private final boolean commitCurrentOnAssignment;
        private final DeliveryAttemptAware deliveryAttemptAware;
        private final ContainerProperties.EOSMode eosMode;
        private final Map<TopicPartition, OffsetAndMetadata> commitsDuringRebalance;
        private final String clientId;
        private final boolean fixTxOffsets;
        private final boolean stopImmediate;
        private final Set<TopicPartition> pausedPartitions;
        private final Map<TopicPartition, List<Long>> offsetsInThisBatch;
        private final Map<TopicPartition, List<ConsumerRecord<K, V>>> deferredOffsets;
        private final Map<TopicPartition, Long> lastReceivePartition;
        private final Map<TopicPartition, Long> lastAlertPartition;
        private final Map<TopicPartition, Boolean> wasIdlePartition;
        private final byte[] listenerinfo;
        private final Header infoHeader;
        private final Set<TopicPartition> pausedForNack;
        private final boolean pauseImmediate;
        private final ObservationRegistry observationRegistry;
        @Nullable
        private final KafkaAdmin kafkaAdmin;
        private final Object bootstrapServers;
        @Nullable
        private final Function<ConsumerRecord<?, ?>, Map<String, String>> micrometerTagsProvider;
        private String clusterId;
        private Map<TopicPartition, OffsetMetadata> definedPartitions;
        private int count;
        private long last;
        private boolean fatalError;
        private boolean taskSchedulerExplicitlySet;
        private long lastReceive;
        private long lastAlertAt;
        private long nackSleepDurationMillis;
        private long nackWakeTimeMillis;
        private int nackIndex;
        private Iterator<TopicPartition> batchIterator;
        private ConsumerRecords<K, V> lastBatch;
        private Producer<?, ?> producer;
        private boolean wasIdle;
        private boolean batchFailed;
        private boolean pausedForAsyncAcks;
        private boolean receivedSome;
        private ConsumerRecords<K, V> remainingRecords;
        private boolean pauseForPending;
        private boolean firstPoll;
        private volatile boolean consumerPaused;
        private volatile Thread consumerThread;
        private volatile long lastPoll;
        private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords;

        ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType, ObservationRegistry observationRegistry) {
            ApplicationContext applicationContext;
            AsyncRepliesAware hmd;
            KafkaAwareTransactionManager kafkaAwareTransactionManager;
            this.logger = KafkaMessageListenerContainer.this.logger;
            this.containerProperties = KafkaMessageListenerContainer.this.getContainerProperties();
            this.commitCallback = this.containerProperties.getCommitCallback() != null ? this.containerProperties.getCommitCallback() : new LoggingCommitCallback();
            this.offsetAndMetadataProvider = this.containerProperties.getOffsetAndMetadataProvider() == null ? (metadata, offset) -> new OffsetAndMetadata(offset) : this.containerProperties.getOffsetAndMetadataProvider();
            this.listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);
            this.offsets = new LinkedHashMap<TopicPartition, Long>();
            this.assignedPartitions = Collections.synchronizedSet(new LinkedHashSet());
            this.lastCommits = new HashMap<TopicPartition, OffsetAndMetadata>();
            this.savedPositions = new HashMap<TopicPartition, Long>();
            this.acks = new LinkedBlockingQueue();
            this.seeks = new LinkedBlockingQueue<TopicPartitionOffset>();
            PlatformTransactionManager platformTransactionManager = this.transactionManager = this.containerProperties.getKafkaAwareTransactionManager() != null ? this.containerProperties.getKafkaAwareTransactionManager() : this.containerProperties.getTransactionManager();
            this.kafkaTxManager = platformTransactionManager instanceof KafkaAwareTransactionManager ? (kafkaAwareTransactionManager = (KafkaAwareTransactionManager)platformTransactionManager) : null;
            this.consumerGroupId = KafkaMessageListenerContainer.this.getGroupId();
            this.commitLogger = new LogIfLevelEnabled(this.logger, this.containerProperties.getCommitLogLevel());
            this.pollTimeout = Duration.ofMillis(this.containerProperties.getPollTimeout());
            this.pollTimeoutWhilePaused = this.containerProperties.getPollTimeoutWhilePaused();
            this.syncCommits = this.containerProperties.isSyncCommits();
            this.recordInterceptor = !KafkaMessageListenerContainer.this.isInterceptBeforeTx() || this.transactionManager == null ? KafkaMessageListenerContainer.this.getRecordInterceptor() : null;
            this.earlyRecordInterceptor = KafkaMessageListenerContainer.this.isInterceptBeforeTx() && this.transactionManager != null ? KafkaMessageListenerContainer.this.getRecordInterceptor() : null;
            this.commonRecordInterceptor = KafkaMessageListenerContainer.this.getRecordInterceptor();
            this.batchInterceptor = !KafkaMessageListenerContainer.this.isInterceptBeforeTx() || this.transactionManager == null ? KafkaMessageListenerContainer.this.getBatchInterceptor() : null;
            this.earlyBatchInterceptor = KafkaMessageListenerContainer.this.isInterceptBeforeTx() && this.transactionManager != null ? KafkaMessageListenerContainer.this.getBatchInterceptor() : null;
            this.commonBatchInterceptor = KafkaMessageListenerContainer.this.getBatchInterceptor();
            this.seekCallback = new InitialOrIdleSeekCallback();
            this.polling = new AtomicBoolean();
            this.subBatchPerPartition = this.containerProperties.isSubBatchPerPartition();
            this.authExceptionRetryInterval = this.containerProperties.getAuthExceptionRetryInterval();
            this.autoCommitOption = this.containerProperties.getAssignmentCommitOption();
            this.eosMode = this.containerProperties.getEosMode();
            this.commitsDuringRebalance = new HashMap<TopicPartition, OffsetAndMetadata>();
            this.fixTxOffsets = this.containerProperties.isFixTxOffsets();
            this.stopImmediate = this.containerProperties.isStopImmediate();
            this.pausedPartitions = new HashSet<TopicPartition>();
            this.listenerinfo = KafkaMessageListenerContainer.this.getListenerInfo();
            this.infoHeader = new RecordHeader("kafka_listenerInfo", this.listenerinfo);
            this.pausedForNack = new HashSet<TopicPartition>();
            this.pauseImmediate = this.containerProperties.isPauseImmediate();
            this.micrometerTagsProvider = this.containerProperties.getMicrometerTagsProvider();
            this.last = System.currentTimeMillis();
            this.lastAlertAt = this.lastReceive;
            this.nackSleepDurationMillis = -1L;
            this.lastPoll = System.currentTimeMillis();
            this.failedRecords = new ConcurrentLinkedDeque();
            this.asyncReplies = listener instanceof AsyncRepliesAware && (hmd = (AsyncRepliesAware)((Object)listener)).isAsyncReplies() || this.containerProperties.isAsyncAcks();
            this.ackMode = this.determineAckMode();
            this.isCountAck = ContainerProperties.AckMode.COUNT.equals((Object)this.ackMode) || ContainerProperties.AckMode.COUNT_TIME.equals((Object)this.ackMode);
            this.isTimeOnlyAck = ContainerProperties.AckMode.TIME.equals((Object)this.ackMode);
            this.isTimeAck = this.isTimeOnlyAck || ContainerProperties.AckMode.COUNT_TIME.equals((Object)this.ackMode);
            this.isManualAck = ContainerProperties.AckMode.MANUAL.equals((Object)this.ackMode);
            this.isManualImmediateAck = ContainerProperties.AckMode.MANUAL_IMMEDIATE.equals((Object)this.ackMode);
            this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
            this.isRecordAck = this.ackMode.equals((Object)ContainerProperties.AckMode.RECORD);
            boolean isOutOfCommit = this.isAnyManualAck && this.asyncReplies;
            this.offsetsInThisBatch = isOutOfCommit ? new ConcurrentHashMap() : null;
            this.deferredOffsets = isOutOfCommit ? new ConcurrentHashMap() : null;
            this.observationRegistry = observationRegistry;
            Properties consumerProperties = KafkaMessageListenerContainer.this.propertiesFromConsumerPropertyOverrides();
            this.checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
            this.autoCommit = this.determineAutoCommit(consumerProperties);
            this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix, consumerProperties);
            this.bootstrapServers = this.determineBootstrapServers(consumerProperties);
            this.clientId = this.determineClientId();
            this.transactionTemplate = this.determineTransactionTemplate();
            this.wantsBatchRecoverAfterRollback = this.containerProperties.isBatchRecoverAfterRollback();
            this.genericListener = listener;
            this.consumerSeekAwareListener = this.checkConsumerSeekAware(listener);
            this.commitCurrentOnAssignment = this.determineCommitCurrent(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
            this.subscribeOrAssignTopics(this.consumer);
            if (listener instanceof BatchMessageListener) {
                this.listener = null;
                this.batchListener = (BatchMessageListener)listener;
                this.isBatchListener = true;
                this.wantsFullRecords = this.batchListener.wantsPollResult();
                this.pollThreadStateProcessor = this.setUpPollProcessor(true);
                this.observationEnabled = false;
            } else if (listener instanceof MessageListener) {
                KafkaBackoffAwareMessageListenerAdapter genListener;
                Object t;
                this.listener = (MessageListener)listener;
                this.batchListener = null;
                this.isBatchListener = false;
                this.wantsFullRecords = false;
                this.pollThreadStateProcessor = this.setUpPollProcessor(false);
                this.observationEnabled = this.containerProperties.isObservationEnabled();
                if (!AopUtils.isAopProxy(this.genericListener) && this.genericListener instanceof KafkaBackoffAwareMessageListenerAdapter && (t = (genListener = (KafkaBackoffAwareMessageListenerAdapter)this.genericListener).getDelegate()) instanceof RecordMessagingMessageListenerAdapter) {
                    RecordMessagingMessageListenerAdapter adapterListener = (RecordMessagingMessageListenerAdapter)t;
                    adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
                }
            } else {
                throw new IllegalArgumentException("Listener must be one of 'MessageListener', 'BatchMessageListener', or the variants that are consumer aware and/or Acknowledging not " + listener.getClass().getName());
            }
            this.listenerType = listenerType;
            this.isConsumerAwareListener = listenerType.equals((Object)ListenerType.ACKNOWLEDGING_CONSUMER_AWARE) || listenerType.equals((Object)ListenerType.CONSUMER_AWARE);
            this.commonErrorHandler = this.determineCommonErrorHandler();
            Assert.state((!this.isBatchListener || !this.isRecordAck ? 1 : 0) != 0, (String)"Cannot use AckMode.RECORD with a batch listener");
            if (this.containerProperties.getScheduler() != null) {
                this.taskScheduler = this.containerProperties.getScheduler();
                this.taskSchedulerExplicitlySet = true;
            } else {
                ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
                threadPoolTaskScheduler.initialize();
                this.taskScheduler = threadPoolTaskScheduler;
            }
            this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer, Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
            if (this.containerProperties.isLogContainerConfig()) {
                this.logger.info((CharSequence)this.toString());
            }
            ClassLoader classLoader = (applicationContext = KafkaMessageListenerContainer.this.getApplicationContext()) == null ? this.getClass().getClassLoader() : applicationContext.getClassLoader();
            this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, false, classLoader);
            this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, true, classLoader);
            this.syncCommitTimeout = this.determineSyncCommitTimeout();
            if (this.containerProperties.getSyncCommitTimeout() == null) {
                this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
                if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
                    KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);
                }
            }
            this.maxPollInterval = this.obtainMaxPollInterval(consumerProperties);
            this.micrometerHolder = this.obtainMicrometerHolder();
            this.deliveryAttemptAware = this.setupDeliveryAttemptAware();
            this.lastReceivePartition = new HashMap<TopicPartition, Long>();
            this.lastAlertPartition = new HashMap<TopicPartition, Long>();
            this.wasIdlePartition = new HashMap<TopicPartition, Boolean>();
            this.kafkaAdmin = this.obtainAdmin();
            if (this.isListenerAdapterObservationAware()) {
                ((RecordMessagingMessageListenerAdapter)this.listener).setObservationRegistry(observationRegistry);
            }
        }

        private ContainerProperties.AckMode determineAckMode() {
            ContainerProperties.AckMode ackMode = this.containerProperties.getAckMode();
            if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
                ackMode = ContainerProperties.AckMode.MANUAL;
            }
            if (this.asyncReplies && !ContainerProperties.AckMode.MANUAL_IMMEDIATE.equals((Object)ackMode) && !ContainerProperties.AckMode.MANUAL.equals((Object)ackMode)) {
                ackMode = ContainerProperties.AckMode.MANUAL;
            }
            return ackMode;
        }

        @Nullable
        private Object determineBootstrapServers(Properties consumerProperties) {
            Object servers = consumerProperties.getProperty("bootstrap.servers");
            if (servers == null) {
                servers = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("bootstrap.servers");
            }
            return servers;
        }

        @Nullable
        private KafkaAdmin obtainAdmin() {
            KafkaAdmin customAdmin = KafkaMessageListenerContainer.this.thisOrParentContainer.getKafkaAdmin();
            if (customAdmin == null && this.observationEnabled) {
                ApplicationContext applicationContext = KafkaMessageListenerContainer.this.getApplicationContext();
                if (applicationContext != null) {
                    HashMap<String, Object> props;
                    KafkaAdmin admin = (KafkaAdmin)applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
                    if (admin != null && !(props = new HashMap<String, Object>(admin.getConfigurationProperties())).get("bootstrap.servers").equals(this.bootstrapServers)) {
                        props.put("bootstrap.servers", this.bootstrapServers);
                        int opTo = admin.getOperationTimeout();
                        admin = new KafkaAdmin(props);
                        admin.setOperationTimeout(opTo);
                    }
                    return admin;
                }
            } else {
                return customAdmin;
            }
            return null;
        }

        @Nullable
        private String clusterId() {
            if (this.kafkaAdmin != null && this.clusterId == null) {
                this.obtainClusterId();
            }
            return this.clusterId;
        }

        private void obtainClusterId() {
            if (this.kafkaAdmin != null) {
                this.clusterId = this.kafkaAdmin.clusterId();
            }
        }

        @Nullable
        private ThreadStateProcessor setUpPollProcessor(boolean batch) {
            return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor;
        }

        @Nullable
        private CommonErrorHandler determineCommonErrorHandler() {
            CommonErrorHandler common = KafkaMessageListenerContainer.this.getCommonErrorHandler();
            if (common == null && this.transactionManager == null) {
                common = new DefaultErrorHandler();
            }
            return common;
        }

        String getClientId() {
            return this.clientId;
        }

        private String determineClientId() {
            Map metrics = this.consumer.metrics();
            Iterator metricIterator = metrics.keySet().iterator();
            if (metricIterator.hasNext()) {
                return (String)((MetricName)metricIterator.next()).tags().get("client-id");
            }
            return "unknown.client.id";
        }

        private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> consumerFactory) {
            Object factoryConfig;
            String groupInstance = properties.getProperty("group.instance.id");
            if (!StringUtils.hasText((String)groupInstance) && (factoryConfig = consumerFactory.getConfigurationProperties().get("group.instance.id")) instanceof String) {
                String str;
                groupInstance = str = (String)factoryConfig;
            }
            if (StringUtils.hasText((String)KafkaMessageListenerContainer.this.clientIdSuffix) && StringUtils.hasText((String)groupInstance)) {
                properties.setProperty("group.instance.id", groupInstance + KafkaMessageListenerContainer.this.clientIdSuffix);
            }
        }

        @Nullable
        private DeliveryAttemptAware setupDeliveryAttemptAware() {
            DeliveryAttemptAware aware = null;
            if (this.containerProperties.isDeliveryAttemptHeader()) {
                if (this.transactionManager != null) {
                    AfterRollbackProcessor afterRollbackProcessor = KafkaMessageListenerContainer.this.getAfterRollbackProcessor();
                    if (afterRollbackProcessor instanceof DeliveryAttemptAware) {
                        DeliveryAttemptAware daa = (DeliveryAttemptAware)((Object)afterRollbackProcessor);
                        aware = daa;
                    }
                } else if (this.commonErrorHandler.deliveryAttemptHeader()) {
                    aware = this.commonErrorHandler;
                }
            }
            return aware;
        }

        private boolean determineCommitCurrent(Properties consumerProperties, Map<String, Object> factoryConfigs) {
            Object config;
            if (ContainerProperties.AssignmentCommitOption.NEVER.equals((Object)this.autoCommitOption)) {
                return false;
            }
            if (!this.autoCommit && ContainerProperties.AssignmentCommitOption.ALWAYS.equals((Object)this.autoCommitOption)) {
                return true;
            }
            String autoOffsetReset = consumerProperties.getProperty("auto.offset.reset");
            if (autoOffsetReset == null && (config = factoryConfigs.get("auto.offset.reset")) instanceof String) {
                String str;
                autoOffsetReset = str = (String)config;
            }
            boolean resetLatest = autoOffsetReset == null || autoOffsetReset.equals("latest");
            boolean latestOnlyOption = ContainerProperties.AssignmentCommitOption.LATEST_ONLY.equals((Object)this.autoCommitOption) || ContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.equals((Object)this.autoCommitOption);
            return !this.autoCommit && resetLatest && latestOnlyOption;
        }

        private long obtainMaxPollInterval(Properties consumerProperties) {
            Object timeout = consumerProperties.get("max.poll.interval.ms");
            if (timeout == null) {
                timeout = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("max.poll.interval.ms");
            }
            if (timeout instanceof Duration) {
                Duration dur = (Duration)timeout;
                return dur.toMillis();
            }
            if (timeout instanceof Number) {
                Number nbr = (Number)timeout;
                return nbr.longValue();
            }
            if (timeout instanceof String) {
                String str = (String)timeout;
                return Long.parseLong(str);
            }
            if (timeout != null) {
                Object timeoutToLog = timeout;
                this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property 'max.poll.interval.ms'; using Kafka default.");
            }
            return ((Integer)CONSUMER_CONFIG_DEFAULTS.get("max.poll.interval.ms")).intValue();
        }

        @Nullable
        private ConsumerSeekAware checkConsumerSeekAware(GenericMessageListener<?> candidate) {
            ConsumerSeekAware csa;
            return candidate instanceof ConsumerSeekAware ? (csa = (ConsumerSeekAware)((Object)candidate)) : null;
        }

        boolean isConsumerPaused() {
            return this.consumerPaused;
        }

        boolean isPartitionPaused(TopicPartition topicPartition) {
            return this.pausedPartitions.contains(topicPartition);
        }

        @Nullable
        private TransactionTemplate determineTransactionTemplate() {
            if (this.transactionManager != null) {
                TransactionTemplate template = new TransactionTemplate(this.transactionManager);
                TransactionDefinition definition = this.containerProperties.getTransactionDefinition();
                Assert.state((definition == null || definition.getPropagationBehavior() == 0 || definition.getPropagationBehavior() == 3 ? 1 : 0) != 0, (String)"Transaction propagation behavior must be REQUIRED or REQUIRES_NEW");
                if (definition != null) {
                    BeanUtils.copyProperties((Object)definition, (Object)template);
                }
                return template;
            }
            return null;
        }

        private boolean determineAutoCommit(Properties consumerProperties) {
            boolean isAutoCommit;
            String autoCommitOverride = consumerProperties.getProperty("enable.auto.commit");
            if (!KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().containsKey("enable.auto.commit") && autoCommitOverride == null) {
                consumerProperties.setProperty("enable.auto.commit", "false");
                isAutoCommit = false;
            } else {
                isAutoCommit = autoCommitOverride != null ? Boolean.parseBoolean(autoCommitOverride) : KafkaMessageListenerContainer.this.consumerFactory.isAutoCommit();
            }
            Assert.state((!this.isAnyManualAck || !isAutoCommit ? 1 : 0) != 0, () -> "Consumer cannot be configured for auto commit for ackMode " + String.valueOf((Object)this.ackMode));
            return isAutoCommit;
        }

        private Duration determineSyncCommitTimeout() {
            Duration syncTimeout = this.containerProperties.getSyncCommitTimeout();
            if (syncTimeout != null) {
                return syncTimeout;
            }
            Object timeout = this.containerProperties.getKafkaConsumerProperties().get("default.api.timeout.ms");
            if (timeout == null) {
                timeout = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties().get("default.api.timeout.ms");
            }
            if (timeout instanceof Duration) {
                Duration dur = (Duration)timeout;
                return dur;
            }
            if (timeout instanceof Number) {
                Number nbr = (Number)timeout;
                return Duration.ofMillis(nbr.longValue());
            }
            if (timeout instanceof String) {
                String str = (String)timeout;
                return Duration.ofMillis(Long.parseLong(str));
            }
            if (timeout != null) {
                Object timeoutToLog = timeout;
                this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName() + " in property 'default.api.timeout.ms'; defaulting to Kafka default for sync commit timeouts");
            }
            return Duration.ofMillis(((Integer)CONSUMER_CONFIG_DEFAULTS.get("default.api.timeout.ms")).intValue());
        }

        private boolean isListenerAdapterObservationAware() {
            return this.listener != null && RecordMessagingMessageListenerAdapter.class.equals(this.listener.getClass());
        }

        private void subscribeOrAssignTopics(Consumer<? super K, ? super V> subscribingConsumer) {
            if (KafkaMessageListenerContainer.this.topicPartitions == null) {
                ListenerConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener();
                Pattern topicPattern = this.containerProperties.getTopicPattern();
                if (topicPattern != null) {
                    subscribingConsumer.subscribe(topicPattern, (ConsumerRebalanceListener)rebalanceListener);
                } else {
                    subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), (ConsumerRebalanceListener)rebalanceListener);
                }
            } else {
                List<TopicPartitionOffset> topicPartitionsToAssign = Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = Collections.synchronizedMap(new LinkedHashMap(topicPartitionsToAssign.size()));
                for (TopicPartitionOffset topicPartition : topicPartitionsToAssign) {
                    this.definedPartitions.put(topicPartition.getTopicPartition(), new OffsetMetadata(topicPartition.getOffset(), topicPartition.isRelativeToCurrent(), topicPartition.getPosition()));
                }
                subscribingConsumer.assign(new ArrayList<TopicPartition>(this.definedPartitions.keySet()));
            }
        }

        protected void checkConsumer() {
            long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
            if ((float)timeSinceLastPoll / (float)this.containerProperties.getPollTimeout() > this.containerProperties.getNoPollThreshold()) {
                KafkaMessageListenerContainer.this.publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
            }
        }

        @Nullable
        private MicrometerHolder obtainMicrometerHolder() {
            MicrometerHolder holder = null;
            try {
                if (KafkaUtils.MICROMETER_PRESENT && this.containerProperties.isMicrometerEnabled() && !this.observationEnabled) {
                    Function<Object, Map<String, String>> mergedProvider = cr -> this.containerProperties.getMicrometerTags();
                    if (this.micrometerTagsProvider != null) {
                        mergedProvider = cr -> {
                            HashMap<String, String> tags = new HashMap<String, String>(this.containerProperties.getMicrometerTags());
                            if (cr != null) {
                                tags.putAll(this.micrometerTagsProvider.apply((ConsumerRecord)cr));
                            }
                            return tags;
                        };
                    }
                    holder = new MicrometerHolder(KafkaMessageListenerContainer.this.getApplicationContext(), KafkaMessageListenerContainer.this.getBeanName(), "spring.kafka.listener", "Kafka Listener Timer", mergedProvider);
                }
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            return holder;
        }

        private void seekPartitions(Collection<TopicPartition> partitions, boolean idle) {
            this.consumerSeekAwareListener.registerSeekCallback(this);
            HashMap<TopicPartition, Long> current = new HashMap<TopicPartition, Long>();
            for (TopicPartition topicPartition : partitions) {
                current.put(topicPartition, this.consumer.position(topicPartition));
            }
            if (idle) {
                this.consumerSeekAwareListener.onIdleContainer(current, this.seekCallback);
            } else {
                this.consumerSeekAwareListener.onPartitionsAssigned(current, this.seekCallback);
            }
        }

        public boolean isLongLived() {
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            this.initialize();
            Object exitThrowable = null;
            boolean failedAuthRetry = false;
            this.lastReceive = System.currentTimeMillis();
            while (KafkaMessageListenerContainer.this.isRunning()) {
                try {
                    this.handleAsyncFailure();
                }
                catch (Exception e) {
                    this.logger.error((CharSequence)"Failed to process async retry messages. skip this time, try it again next loop.");
                }
                try {
                    this.pollAndInvoke();
                    if (!failedAuthRetry) continue;
                    KafkaMessageListenerContainer.this.publishRetryAuthSuccessfulEvent();
                    failedAuthRetry = false;
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    this.logger.error((Throwable)nofpe, (CharSequence)"No offset and no reset policy");
                    exitThrowable = nofpe;
                    break;
                }
                catch (AuthenticationException | AuthorizationException ae) {
                    if (this.authExceptionRetryInterval == null) {
                        this.logger.error(ae, (CharSequence)"Authentication/Authorization Exception and no authExceptionRetryInterval set");
                        this.fatalError = true;
                        exitThrowable = ae;
                        break;
                    }
                    this.logger.error(ae, (CharSequence)("Authentication/Authorization Exception, retrying in " + this.authExceptionRetryInterval.toMillis() + " ms"));
                    KafkaMessageListenerContainer.this.publishRetryAuthEvent(ae);
                    failedAuthRetry = true;
                    this.sleepFor(this.authExceptionRetryInterval);
                }
                catch (FencedInstanceIdException fie) {
                    this.fatalError = true;
                    this.logger.error((Throwable)fie, (CharSequence)"'group.instance.id' has been fenced");
                    exitThrowable = fie;
                    break;
                }
                catch (StopAfterFenceException e) {
                    this.logger.error((Throwable)((Object)e), (CharSequence)"Stopping container due to fencing");
                    KafkaMessageListenerContainer.this.stop(false);
                    exitThrowable = e;
                }
                catch (Error e) {
                    this.logger.error((Throwable)e, (CharSequence)"Stopping container due to an Error");
                    this.fatalError = true;
                    this.wrapUp(e);
                    throw e;
                }
                catch (Exception e) {
                    this.handleConsumerException(e);
                }
                finally {
                    this.clearThreadState();
                }
            }
            this.wrapUp((Throwable)exitThrowable);
        }

        protected void initialize() {
            if (KafkaMessageListenerContainer.this.thisOrParentContainer.isChangeConsumerThreadName()) {
                Thread.currentThread().setName(KafkaMessageListenerContainer.this.thisOrParentContainer.getThreadNameSupplier().apply(KafkaMessageListenerContainer.this));
            }
            KafkaMessageListenerContainer.this.publishConsumerStartingEvent();
            this.consumerThread = Thread.currentThread();
            KafkaUtils.setConsumerGroupId(this.consumerGroupId);
            this.setupSeeks();
            this.count = 0;
            this.last = System.currentTimeMillis();
            this.initAssignedPartitions();
            KafkaMessageListenerContainer.this.thisOrParentContainer.childStarted(KafkaMessageListenerContainer.this);
            KafkaMessageListenerContainer.this.publishConsumerStartedEvent();
        }

        private void setupSeeks() {
            if (this.consumerSeekAwareListener != null) {
                this.consumerSeekAwareListener.registerSeekCallback(this);
            }
        }

        private void initAssignedPartitions() {
            if (KafkaMessageListenerContainer.this.isRunning() && this.definedPartitions != null) {
                try {
                    this.initPartitionsIfNeeded();
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, (CharSequence)"Failed to set initial offsets");
                }
            }
        }

        protected void pollAndInvoke() {
            this.doProcessCommits();
            this.fixTxOffsetsIfNeeded();
            this.idleBetweenPollIfNecessary();
            if (!this.seeks.isEmpty()) {
                this.processSeeks();
            }
            this.enforceRebalanceIfNecessary();
            this.pauseConsumerIfNecessary();
            this.pausePartitionsIfNecessary();
            this.lastPoll = System.currentTimeMillis();
            if (!KafkaMessageListenerContainer.this.isRunning()) {
                return;
            }
            this.polling.set(true);
            ConsumerRecords records = this.doPoll();
            if (!this.polling.compareAndSet(true, false) && records != null) {
                if (records.count() > 0) {
                    this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
                }
                return;
            }
            if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) {
                this.firstPoll = true;
                this.consumerSeekAwareListener.onFirstPoll();
            }
            if (records != null && records.count() == 0 && this.isCountAck && this.count > 0) {
                this.commitIfNecessary();
                this.count = 0;
            }
            this.debugRecords(records);
            this.invokeIfHaveRecords(records);
            if (this.remainingRecords == null) {
                this.resumeConsumerIfNeccessary();
                if (!this.consumerPaused) {
                    this.resumePartitionsIfNecessary();
                }
            }
        }

        protected void handleAsyncFailure() {
            ArrayList copyFailedRecords = new ArrayList(this.failedRecords);
            int capturedRecordsCount = copyFailedRecords.size();
            for (int i = 0; i < capturedRecordsCount; ++i) {
                this.failedRecords.pollFirst();
            }
            for (FailedRecordTuple failedRecordTuple : copyFailedRecords) {
                try {
                    this.invokeErrorHandlerBySingleRecord(failedRecordTuple);
                }
                catch (Exception e) {
                    this.logger.warn(() -> "Async failed record failed to complete, thus skip it. record :" + copyFailedRecord.toString() + ", Exception : " + e.getMessage());
                }
            }
        }

        private void doProcessCommits() {
            block4: {
                if (!this.autoCommit && !this.isRecordAck) {
                    try {
                        this.processCommits();
                    }
                    catch (CommitFailedException cfe) {
                        if (this.remainingRecords == null || this.isBatchListener) break block4;
                        ConsumerRecords pending = this.remainingRecords;
                        this.remainingRecords = null;
                        ArrayList records = new ArrayList();
                        for (ConsumerRecord kvConsumerRecord : pending) {
                            records.add(kvConsumerRecord);
                        }
                        this.commonErrorHandler.handleRemaining((Exception)((Object)cfe), records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
                    }
                }
            }
        }

        private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
            if (records != null && records.count() > 0) {
                this.receivedSome = true;
                this.savePositionsIfNeeded(records);
                this.notIdle();
                this.notIdlePartitions(records.partitions());
                this.invokeListener(records);
            } else {
                this.checkIdle();
            }
            if (records == null || records.count() == 0 || records.partitions().size() < this.consumer.assignment().size()) {
                this.checkIdlePartitions();
            }
        }

        private void clearThreadState() {
            if (this.pollThreadStateProcessor != null) {
                this.pollThreadStateProcessor.clearThreadState(this.consumer);
            }
        }

        private void checkIdlePartitions() {
            Set partitions = this.consumer.assignment();
            partitions.forEach(this::checkIdlePartition);
        }

        private void checkIdlePartition(TopicPartition topicPartition) {
            Long idlePartitionEventInterval = this.containerProperties.getIdlePartitionEventInterval();
            if (idlePartitionEventInterval != null) {
                long now = System.currentTimeMillis();
                Long lstReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
                Long lstAlertAt = this.lastAlertPartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
                if (now > lstReceive + idlePartitionEventInterval && now > lstAlertAt + idlePartitionEventInterval) {
                    this.wasIdlePartition.put(topicPartition, true);
                    KafkaMessageListenerContainer.this.publishIdlePartitionEvent(now - lstReceive, topicPartition, this.consumer, KafkaMessageListenerContainer.this.isPartitionPauseRequested(topicPartition));
                    this.lastAlertPartition.put(topicPartition, now);
                    if (this.consumerSeekAwareListener != null) {
                        this.seekPartitions(Collections.singletonList(topicPartition), true);
                    }
                }
            }
        }

        private void notIdlePartitions(Set<TopicPartition> partitions) {
            if (this.containerProperties.getIdlePartitionEventInterval() != null) {
                partitions.forEach(this::notIdlePartition);
            }
        }

        private void notIdlePartition(TopicPartition topicPartition) {
            long now = System.currentTimeMillis();
            Boolean partitionWasIdle = this.wasIdlePartition.get(topicPartition);
            if (partitionWasIdle != null && partitionWasIdle.booleanValue()) {
                this.wasIdlePartition.put(topicPartition, false);
                Long lstReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
                KafkaMessageListenerContainer.this.publishNoLongerIdlePartitionEvent(now - lstReceive, this.consumer, topicPartition);
            }
            this.lastReceivePartition.put(topicPartition, now);
        }

        private void notIdle() {
            if (this.containerProperties.getIdleEventInterval() != null) {
                long now = System.currentTimeMillis();
                if (this.wasIdle) {
                    this.wasIdle = false;
                    KafkaMessageListenerContainer.this.publishNoLongerIdleContainerEvent(now - this.lastReceive, this.consumer);
                }
                this.lastReceive = now;
            }
        }

        private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
            if (this.fixTxOffsets) {
                this.savedPositions.clear();
                records.partitions().forEach(tp -> this.savedPositions.put((TopicPartition)tp, this.consumer.position(tp)));
            }
        }

        private void fixTxOffsetsIfNeeded() {
            if (this.fixTxOffsets) {
                try {
                    HashMap<TopicPartition, OffsetAndMetadata> toFix = new HashMap<TopicPartition, OffsetAndMetadata>();
                    this.lastCommits.forEach((tp, oamd) -> {
                        long position = this.consumer.position(tp);
                        Long saved = this.savedPositions.get(tp);
                        if (saved != null && saved != position) {
                            this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; saved: " + String.valueOf(this.savedPositions) + ", committed: " + String.valueOf(oamd) + ", current: " + String.valueOf(tp) + "@" + position);
                            return;
                        }
                        if (position > oamd.offset()) {
                            toFix.put((TopicPartition)tp, this.createOffsetAndMetadata(position));
                        }
                    });
                    if (!toFix.isEmpty()) {
                        this.logger.debug(() -> "Fixing TX offsets: " + String.valueOf(toFix));
                        if (this.kafkaTxManager == null) {
                            this.commitOffsets(toFix);
                        } else {
                            this.transactionTemplate.executeWithoutResult(status -> this.doSendOffsets(this.getTxProducer(), toFix));
                        }
                    }
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, () -> "Failed to correct transactional offset(s): " + String.valueOf(this.lastCommits));
                }
                finally {
                    this.lastCommits.clear();
                }
            }
        }

        @Nullable
        private ConsumerRecords<K, V> doPoll() {
            ConsumerRecords records;
            if (this.isBatchListener && this.subBatchPerPartition) {
                if (this.batchIterator == null) {
                    this.lastBatch = this.pollConsumer();
                    this.captureOffsets(this.lastBatch);
                    if (this.lastBatch.count() == 0) {
                        return this.lastBatch;
                    }
                    this.batchIterator = this.lastBatch.partitions().iterator();
                }
                TopicPartition next = this.batchIterator.next();
                List subBatch = this.lastBatch.records(next);
                records = new ConsumerRecords(Collections.singletonMap(next, subBatch));
                if (!this.batchIterator.hasNext()) {
                    this.batchIterator = null;
                }
            } else {
                records = this.pollConsumer();
                if (this.remainingRecords != null) {
                    int howManyRecords = records.count();
                    if (howManyRecords > 0) {
                        this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused after an error; emergency stop invoked to avoid message loss", howManyRecords));
                        KafkaMessageListenerContainer.this.emergencyStop.run();
                    }
                    TopicPartition firstPart = (TopicPartition)this.remainingRecords.partitions().iterator().next();
                    boolean isPaused = KafkaMessageListenerContainer.this.isPauseRequested() || KafkaMessageListenerContainer.this.isPartitionPauseRequested(firstPart);
                    this.logger.debug(() -> "First pending after error: " + String.valueOf(firstPart) + "; paused: " + isPaused);
                    if (!isPaused) {
                        records = this.remainingRecords;
                        this.remainingRecords = null;
                    }
                }
                this.captureOffsets(records);
                this.checkRebalanceCommits();
            }
            return records;
        }

        private ConsumerRecords<K, V> pollConsumer() {
            this.beforePoll();
            try {
                return this.consumer.poll(this.consumerPaused ? this.pollTimeoutWhilePaused : this.pollTimeout);
            }
            catch (WakeupException ex) {
                return ConsumerRecords.empty();
            }
        }

        private void beforePoll() {
            if (this.pollThreadStateProcessor != null) {
                this.pollThreadStateProcessor.setupThreadState(this.consumer);
            }
        }

        private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
            if (this.offsetsInThisBatch != null && records.count() > 0) {
                this.offsetsInThisBatch.clear();
                this.deferredOffsets.clear();
                records.partitions().forEach(part -> {
                    LinkedList offs = new LinkedList();
                    this.offsetsInThisBatch.put((TopicPartition)part, offs);
                    this.deferredOffsets.put((TopicPartition)part, new LinkedList());
                    records.records(part).forEach(rec -> offs.add(rec.offset()));
                });
            }
        }

        private void checkRebalanceCommits() {
            if (!this.commitsDuringRebalance.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> commits = this.commitsDuringRebalance.entrySet().stream().filter(entry -> this.assignedPartitions.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                Map<TopicPartition, OffsetAndMetadata> uncommitted = this.commitsDuringRebalance.entrySet().stream().filter(entry -> !this.assignedPartitions.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                if (!uncommitted.isEmpty()) {
                    this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: " + String.valueOf(uncommitted));
                }
                this.commitsDuringRebalance.clear();
                this.logger.debug(() -> "Commit list: " + String.valueOf(commits));
                this.commitSync(commits);
            }
        }

        void wakeIfNecessaryForStop() {
            if (this.polling.getAndSet(false)) {
                this.consumer.wakeup();
            }
        }

        void wakeIfNecessary() {
            if (this.polling.get()) {
                this.consumer.wakeup();
            }
        }

        private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
            if (records != null) {
                this.logger.debug(() -> "Received: " + records.count() + " records");
                if (records.count() > 0) {
                    this.logger.trace(() -> records.partitions().stream().flatMap(p -> records.records(p).stream()).map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()).toList().toString());
                }
            }
        }

        private void sleepFor(Duration duration) {
            try {
                ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this, duration.toMillis());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.error((Throwable)e, (CharSequence)"Interrupted while sleeping");
            }
        }

        private void enforceRebalanceIfNecessary() {
            try {
                if (KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.get()) {
                    String enforcedRebalanceReason = String.format("Enforced rebalance requested for container: %s", KafkaMessageListenerContainer.this.getListenerId());
                    this.logger.info((CharSequence)enforcedRebalanceReason);
                    this.consumer.enforceRebalance(enforcedRebalanceReason);
                }
            }
            finally {
                KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.set(false);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void pauseConsumerIfNecessary() {
            if (this.offsetsInThisBatch != null) {
                ListenerConsumer listenerConsumer = this;
                synchronized (listenerConsumer) {
                    this.doPauseConsumerIfNecessary();
                }
            } else {
                this.doPauseConsumerIfNecessary();
            }
        }

        private void doPauseConsumerIfNecessary() {
            Collection<TopicPartition> assigned;
            if (!this.pausedForNack.isEmpty()) {
                this.logger.debug((CharSequence)"Still paused for nack sleep");
                return;
            }
            if (this.offsetsInThisBatch != null && !this.offsetsInThisBatch.isEmpty() && !this.pausedForAsyncAcks) {
                this.pausedForAsyncAcks = true;
                this.logger.debug(() -> "Pausing for incomplete async acks: " + String.valueOf(this.offsetsInThisBatch));
            }
            if ((!this.consumerPaused && (KafkaMessageListenerContainer.this.isPauseRequested() || this.pausedForAsyncAcks) || this.pauseForPending) && !CollectionUtils.isEmpty(assigned = KafkaMessageListenerContainer.this.getAssignedPartitions())) {
                this.consumer.pause(assigned);
                this.consumerPaused = true;
                this.pauseForPending = false;
                this.logger.debug(() -> "Paused consumption from: " + String.valueOf(this.consumer.paused()));
                KafkaMessageListenerContainer.this.publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks ? "Incomplete out of order acks" : "User requested");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void resumeConsumerIfNeccessary() {
            if (this.nackWakeTimeMillis > 0L) {
                if (System.currentTimeMillis() > this.nackWakeTimeMillis) {
                    this.nackWakeTimeMillis = 0L;
                    this.consumer.resume(this.pausedForNack);
                    this.logger.debug(() -> "Resumed after nack sleep: " + String.valueOf(this.pausedForNack));
                    KafkaMessageListenerContainer.this.publishConsumerResumedEvent(this.pausedForNack);
                    this.pausedForNack.clear();
                }
            } else if (this.offsetsInThisBatch != null) {
                ListenerConsumer listenerConsumer = this;
                synchronized (listenerConsumer) {
                    this.doResumeConsumerIfNeccessary();
                }
            } else {
                this.doResumeConsumerIfNeccessary();
            }
        }

        private void doResumeConsumerIfNeccessary() {
            if (this.pausedForAsyncAcks && this.offsetsInThisBatch.isEmpty()) {
                this.pausedForAsyncAcks = false;
                this.logger.debug((CharSequence)"Resuming after manual async acks cleared");
            }
            if (this.consumerPaused && !KafkaMessageListenerContainer.this.isPauseRequested() && !this.pausedForAsyncAcks) {
                this.logger.debug(() -> "Resuming consumption from: " + String.valueOf(this.consumer.paused()));
                LinkedList<TopicPartition> paused = new LinkedList<TopicPartition>(this.consumer.paused());
                paused.removeAll(this.pausedPartitions);
                this.consumer.resume(paused);
                this.consumerPaused = false;
                KafkaMessageListenerContainer.this.publishConsumerResumedEvent(paused);
            }
        }

        private void pausePartitionsIfNecessary() {
            List<TopicPartition> partitionsToPause;
            Set pausedConsumerPartitions = this.consumer.paused();
            Collection<TopicPartition> partitions = KafkaMessageListenerContainer.this.getAssignedPartitions();
            if (partitions != null && !(partitionsToPause = partitions.stream().filter(tp -> KafkaMessageListenerContainer.this.isPartitionPauseRequested((TopicPartition)tp) && !pausedConsumerPartitions.contains(tp)).toList()).isEmpty()) {
                this.consumer.pause(partitionsToPause);
                this.pausedPartitions.addAll(partitionsToPause);
                this.logger.debug(() -> "Paused consumption from " + String.valueOf(partitionsToPause));
                partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
            }
        }

        private void resumePartitionsIfNecessary() {
            List<TopicPartition> partitionsToResume;
            Collection<TopicPartition> assigned = KafkaMessageListenerContainer.this.getAssignedPartitions();
            if (assigned != null && !(partitionsToResume = assigned.stream().filter(tp -> !KafkaMessageListenerContainer.this.isPartitionPauseRequested((TopicPartition)tp) && this.pausedPartitions.contains(tp)).toList()).isEmpty()) {
                this.consumer.resume(partitionsToResume);
                this.pausedPartitions.removeAll(partitionsToResume);
                this.logger.debug(() -> "Resumed consumption from " + String.valueOf(partitionsToResume));
                partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
            }
        }

        private void checkIdle() {
            Long idleEventInterval = this.containerProperties.getIdleEventInterval();
            if (idleEventInterval != null) {
                long idleEventInterval2 = idleEventInterval;
                long now = System.currentTimeMillis();
                if (!this.receivedSome) {
                    idleEventInterval2 = (long)((double)idleEventInterval2 * this.containerProperties.getIdleBeforeDataMultiplier());
                }
                if (now > this.lastReceive + idleEventInterval2 && now > this.lastAlertAt + idleEventInterval2) {
                    Collection<TopicPartition> partitions;
                    this.wasIdle = true;
                    KafkaMessageListenerContainer.this.publishIdleContainerEvent(now - this.lastReceive, this.consumer, this.consumerPaused);
                    this.lastAlertAt = now;
                    if (this.consumerSeekAwareListener != null && (partitions = KafkaMessageListenerContainer.this.getAssignedPartitions()) != null) {
                        this.seekPartitions(partitions, true);
                    }
                }
            }
        }

        private void idleBetweenPollIfNecessary() {
            long idleBetweenPolls = this.containerProperties.getIdleBetweenPolls();
            Collection<TopicPartition> assigned = KafkaMessageListenerContainer.this.getAssignedPartitions();
            if (idleBetweenPolls > 0L && assigned != null && !assigned.isEmpty() && (idleBetweenPolls = Math.min(idleBetweenPolls, this.maxPollInterval - (System.currentTimeMillis() - this.lastPoll) - 5000L)) > 0L) {
                try {
                    ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this, idleBetweenPolls);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Consumer Thread [" + String.valueOf(this) + "] has been interrupted", ex);
                }
            }
        }

        private void wrapUp(@Nullable Throwable throwable) {
            KafkaUtils.clearConsumerGroupId();
            if (this.micrometerHolder != null) {
                this.micrometerHolder.destroy();
            }
            KafkaMessageListenerContainer.this.publishConsumerStoppingEvent(this.consumer);
            Collection<TopicPartition> partitions = KafkaMessageListenerContainer.this.getAssignedPartitions();
            if (!this.fatalError) {
                if (this.kafkaTxManager == null) {
                    this.commitPendingAcks();
                    try {
                        this.consumer.unsubscribe();
                    }
                    catch (WakeupException wakeupException) {}
                }
            } else {
                if (!(throwable instanceof Error)) {
                    this.logger.error((CharSequence)"Fatal consumer exception; stopping container");
                }
                KafkaMessageListenerContainer.this.emergencyStop.run();
            }
            this.monitorTask.cancel(true);
            if (!this.taskSchedulerExplicitlySet) {
                ((ThreadPoolTaskScheduler)this.taskScheduler).destroy();
            }
            this.consumer.close();
            KafkaMessageListenerContainer.this.getAfterRollbackProcessor().clearThreadState();
            if (this.commonErrorHandler != null) {
                this.commonErrorHandler.clearThreadState();
            }
            if (this.consumerSeekAwareListener != null) {
                this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
                this.consumerSeekAwareListener.unregisterSeekCallback();
            }
            this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
            KafkaMessageListenerContainer.this.publishConsumerStoppedEvent(throwable);
        }

        protected void handleConsumerException(Exception e) {
            if (e instanceof RetriableCommitFailedException) {
                this.logger.error((Throwable)e, (CharSequence)"Commit retries exhausted");
                return;
            }
            try {
                if (this.commonErrorHandler != null) {
                    this.commonErrorHandler.handleOtherException(e, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, this.isBatchListener);
                } else {
                    this.logger.error((Throwable)e, (CharSequence)"Consumer exception");
                }
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Consumer exception");
            }
        }

        private void commitPendingAcks() {
            this.processCommits();
            if (!this.offsets.isEmpty()) {
                this.commitIfNecessary();
            }
        }

        private void handleAcks() {
            ConsumerRecord cRecord = (ConsumerRecord)this.acks.poll();
            while (cRecord != null) {
                this.traceAck(cRecord);
                this.processAck(cRecord);
                cRecord = (ConsumerRecord)this.acks.poll();
            }
        }

        private void traceAck(ConsumerRecord<K, V> cRecord) {
            this.logger.trace(() -> "Ack: " + KafkaUtils.format(cRecord));
        }

        private void doAck(ConsumerRecord<K, V> cRecord) {
            this.traceAck(cRecord);
            if (this.offsetsInThisBatch != null) {
                this.ackInOrder(cRecord);
            } else {
                this.processAck(cRecord);
            }
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void processAck(ConsumerRecord<K, V> cRecord) {
            if (!Thread.currentThread().equals(this.consumerThread)) {
                try {
                    this.acks.put(cRecord);
                    if (!this.isManualImmediateAck && !this.pausedForAsyncAcks) return;
                    this.consumer.wakeup();
                    return;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while storing ack", e);
                }
            } else if (this.isManualImmediateAck) {
                try {
                    this.ackImmediate(cRecord);
                    return;
                }
                catch (WakeupException wakeupException) {}
                return;
            } else {
                this.addOffset(cRecord);
            }
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void processAcks(ConsumerRecords<K, V> records) {
            if (!Thread.currentThread().equals(this.consumerThread)) {
                try {
                    for (ConsumerRecord cRecord : records) {
                        this.acks.put(cRecord);
                    }
                    if (!this.isManualImmediateAck) return;
                    this.consumer.wakeup();
                    return;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new KafkaException("Interrupted while storing ack", e);
                }
            } else if (this.isManualImmediateAck) {
                try {
                    this.ackImmediate(records);
                    return;
                }
                catch (WakeupException wakeupException) {}
                return;
            } else {
                for (ConsumerRecord cRecord : records) {
                    this.addOffset(cRecord);
                }
            }
        }

        private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
            TopicPartition part = new TopicPartition(cRecord.topic(), cRecord.partition());
            List<Long> offs = this.offsetsInThisBatch.get(part);
            if (!ObjectUtils.isEmpty(offs)) {
                List deferred = this.deferredOffsets.get(part);
                if (offs.get(0).longValue() == cRecord.offset()) {
                    offs.remove(0);
                    ConsumerRecord recordToAck = cRecord;
                    if (!deferred.isEmpty()) {
                        deferred.sort((a, b) -> Long.compare(a.offset(), b.offset()));
                        while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1L) {
                            recordToAck = deferred.remove(0);
                            offs.remove(0);
                        }
                    }
                    this.processAck(recordToAck);
                    if (offs.isEmpty()) {
                        this.deferredOffsets.remove(part);
                        this.offsetsInThisBatch.remove(part);
                    }
                } else {
                    if (cRecord.offset() < offs.get(0)) {
                        throw new IllegalStateException("First remaining offset for this batch is " + String.valueOf(offs.get(0)) + "; you are acknowledging a stale record: " + KafkaUtils.format(cRecord));
                    }
                    deferred.add(cRecord);
                }
            } else {
                throw new IllegalStateException("Unexpected ack for " + KafkaUtils.format(cRecord) + "; offsets list is empty");
            }
        }

        private void ackImmediate(ConsumerRecord<K, V> cRecord) {
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildSingleCommits(cRecord);
            this.commitOffsetsInTransactions(commits);
        }

        private void ackImmediate(ConsumerRecords<K, V> records) {
            HashMap<TopicPartition, OffsetAndMetadata> commits = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (TopicPartition part : records.partitions()) {
                commits.put(part, this.createOffsetAndMetadata(((ConsumerRecord)records.records(part).get(records.records(part).size() - 1)).offset() + 1L));
            }
            this.commitOffsetsInTransactions(commits);
        }

        private void invokeListener(ConsumerRecords<K, V> records) {
            if (this.isBatchListener) {
                this.invokeBatchListener(records);
            } else {
                this.invokeRecordListener(records);
            }
        }

        private void invokeBatchListener(ConsumerRecords<K, V> recordsArg) {
            ConsumerRecords records = this.checkEarlyIntercept(recordsArg);
            if (records == null || records.count() == 0) {
                return;
            }
            List recordList = null;
            if (!this.wantsFullRecords) {
                recordList = this.createRecordList(records);
            }
            if (this.wantsFullRecords || !recordList.isEmpty()) {
                if (this.transactionTemplate != null) {
                    this.invokeBatchListenerInTx(records, recordList);
                } else {
                    this.doInvokeBatchListener(records, recordList);
                }
            }
        }

        private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records, final @Nullable List<ConsumerRecord<K, V>> recordList) {
            try {
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    public void doInTransactionWithoutResult(TransactionStatus s) {
                        RuntimeException aborted;
                        if (ListenerConsumer.this.kafkaTxManager != null) {
                            ListenerConsumer.this.producer = ListenerConsumer.this.getTxProducer();
                        }
                        if ((aborted = ListenerConsumer.this.doInvokeBatchListener(records, recordList)) != null) {
                            throw aborted;
                        }
                    }
                });
            }
            catch (FencedInstanceIdException | ProducerFencedException e) {
                this.logger.error(e, (CharSequence)"Producer or 'group.instance.id' fenced during transaction");
                if (this.containerProperties.isStopContainerWhenFenced()) {
                    throw new StopAfterFenceException("Container stopping due to fencing", e);
                }
            }
            catch (RuntimeException e) {
                this.logger.error((Throwable)e, (CharSequence)"Transaction rolled back");
                this.batchRollback(records, recordList, e);
            }
        }

        private void batchRollback(final ConsumerRecords<K, V> records, final @Nullable List<ConsumerRecord<K, V>> recordList, final RuntimeException e) {
            final AfterRollbackProcessor afterRollbackProcessorToUse = KafkaMessageListenerContainer.this.getAfterRollbackProcessor();
            if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        afterRollbackProcessorToUse.processBatch(records, Objects.requireNonNullElseGet(recordList, () -> ListenerConsumer.this.createRecordList(records)), ListenerConsumer.this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, e, ListenerConsumer.this.wantsBatchRecoverAfterRollback, ListenerConsumer.this.eosMode);
                    }
                });
            } else {
                try {
                    afterRollbackProcessorToUse.processBatch(records, Objects.requireNonNullElseGet(recordList, () -> this.createRecordList(records)), this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, e, this.wantsBatchRecoverAfterRollback, this.eosMode);
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"AfterRollbackProcessor threw exception");
                }
            }
        }

        private List<ConsumerRecord<K, V>> createRecordList(ConsumerRecords<K, V> records) {
            ArrayList recordList = new ArrayList(records.count());
            records.forEach(recordList::add);
            return recordList;
        }

        @Nullable
        private RuntimeException doInvokeBatchListener(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList) {
            try {
                this.invokeBatchOnMessage(records, recordList);
                if (this.batchFailed) {
                    this.batchFailed = false;
                    if (this.commonErrorHandler != null) {
                        this.commonErrorHandler.clearThreadState();
                    }
                    KafkaMessageListenerContainer.this.getAfterRollbackProcessor().clearThreadState();
                }
                if (!this.autoCommit && !this.isRecordAck) {
                    this.processCommits();
                }
            }
            catch (RuntimeException e) {
                if (this.commonErrorHandler == null) {
                    throw e;
                }
                try {
                    this.invokeBatchErrorHandler(records, recordList, e);
                    this.commitOffsetsIfNeededAfterHandlingError(records);
                }
                catch (RecordInRetryException rire) {
                    this.logger.info((CharSequence)"Record in retry and not yet recovered");
                    return rire;
                }
                catch (KafkaException ke) {
                    ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
                    return ke;
                }
                catch (RuntimeException ee) {
                    this.logger.error((Throwable)ee, (CharSequence)ERROR_HANDLER_THREW_AN_EXCEPTION);
                    return ee;
                }
                catch (Error er) {
                    this.logger.error((Throwable)er, (CharSequence)"Error handler threw an error");
                    throw er;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return null;
        }

        private void commitOffsetsIfNeededAfterHandlingError(ConsumerRecords<K, V> records) {
            if (!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null || this.producer != null) {
                if (this.remainingRecords != null) {
                    ConsumerRecord next;
                    ConsumerRecord firstUncommitted = (ConsumerRecord)this.remainingRecords.iterator().next();
                    Iterator iterator = records.iterator();
                    while (iterator.hasNext() && !(next = (ConsumerRecord)iterator.next()).equals(firstUncommitted)) {
                        this.acks.add(next);
                    }
                } else {
                    this.acks.addAll(this.getHighestOffsetRecords(records));
                }
                if (this.producer != null) {
                    this.sendOffsetsToTransaction();
                }
            }
        }

        private void batchInterceptAfter(ConsumerRecords<K, V> records, @Nullable Exception exception) {
            if (this.commonBatchInterceptor != null) {
                try {
                    if (exception == null) {
                        this.commonBatchInterceptor.success(records, this.consumer);
                    } else {
                        this.commonBatchInterceptor.failure(records, exception, this.consumer);
                    }
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, (CharSequence)"BatchInterceptor.success/failure threw an exception");
                }
            }
        }

        @Nullable
        private Object startMicrometerSample() {
            if (this.micrometerHolder != null) {
                return this.micrometerHolder.start();
            }
            return null;
        }

        private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
            if (sample != null) {
                if (this.micrometerTagsProvider == null || record == null) {
                    this.micrometerHolder.success(sample);
                } else {
                    this.micrometerHolder.success(sample, record);
                }
            }
        }

        private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record, Throwable exception) {
            if (sample != null) {
                String exceptionName;
                String string = exceptionName = exception.getCause() != null ? exception.getCause().getClass().getSimpleName() : exception.getClass().getSimpleName();
                if (this.micrometerTagsProvider == null || record == null) {
                    this.micrometerHolder.failure(sample, exceptionName);
                } else {
                    this.micrometerHolder.failure(sample, exceptionName, record);
                }
            }
        }

        private void invokeBatchOnMessage(ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList) throws InterruptedException {
            this.invokeBatchOnMessageWithRecordsOrList(records, recordList);
            ArrayList toSeek = null;
            if (this.nackSleepDurationMillis >= 0L) {
                int index = 0;
                toSeek = new ArrayList();
                for (ConsumerRecord cRecord : records) {
                    if (index++ < this.nackIndex) continue;
                    toSeek.add(cRecord);
                }
            }
            if (this.producer != null || !this.isAnyManualAck && !this.autoCommit) {
                if (this.nackSleepDurationMillis < 0L) {
                    this.ackBatch(records);
                }
                if (this.producer != null) {
                    this.sendOffsetsToTransaction();
                }
            }
            if (toSeek != null) {
                if (!this.autoCommit) {
                    this.processCommits();
                }
                SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger);
                this.pauseForNackSleep();
            }
        }

        private void ackBatch(ConsumerRecords<K, V> records) throws InterruptedException {
            for (ConsumerRecord cRecord : this.getHighestOffsetRecords(records)) {
                this.acks.put(cRecord);
            }
        }

        private void invokeBatchOnMessageWithRecordsOrList(ConsumerRecords<K, V> recordsArg, @Nullable List<ConsumerRecord<K, V>> recordListArg) {
            ConsumerRecords records = recordsArg;
            List recordList = recordListArg;
            if (this.listenerinfo != null) {
                records.iterator().forEachRemaining(this::listenerInfo);
            }
            if (this.batchInterceptor != null) {
                records = this.batchInterceptor.intercept(recordsArg, this.consumer);
                if (records == null) {
                    this.logger.debug(() -> "BatchInterceptor returned null, skipping: " + String.valueOf(recordsArg) + " with " + recordsArg.count() + " records");
                    return;
                }
                recordList = this.createRecordList(records);
            }
            Object sample = this.startMicrometerSample();
            try {
                if (this.wantsFullRecords) {
                    this.batchListener.onMessage(records, (Acknowledgment)(this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null), this.consumer);
                } else {
                    this.doInvokeBatchOnMessage(records, recordList);
                }
                this.batchInterceptAfter(records, null);
                this.successTimer(sample, null);
            }
            catch (RuntimeException e) {
                this.batchFailed = true;
                this.failureTimer(sample, null, e);
                this.batchInterceptAfter(records, e);
                throw e;
            }
        }

        private void doInvokeBatchOnMessage(ConsumerRecords<K, V> records, @Nullable List<ConsumerRecord<K, V>> recordList) {
            try {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE: {
                        this.batchListener.onMessage(recordList, (Acknowledgment)(this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null), this.consumer);
                        break;
                    }
                    case ACKNOWLEDGING: {
                        this.batchListener.onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null);
                        break;
                    }
                    case CONSUMER_AWARE: {
                        this.batchListener.onMessage(recordList, this.consumer);
                        break;
                    }
                    case SIMPLE: {
                        this.batchListener.onMessage(recordList);
                    }
                }
            }
            catch (Exception ex) {
                throw this.decorateException(ex);
            }
        }

        private void invokeBatchErrorHandler(ConsumerRecords<K, V> records, @Nullable List<ConsumerRecord<K, V>> list, RuntimeException rte) {
            if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null || rte instanceof CommitFailedException) {
                this.commonErrorHandler.handleBatch(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, () -> this.invokeBatchOnMessageWithRecordsOrList(records, list));
            } else {
                ConsumerRecords afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, () -> this.invokeBatchOnMessageWithRecordsOrList(records, list));
                if (afterHandling != null && !afterHandling.isEmpty()) {
                    this.remainingRecords = afterHandling;
                    this.pauseForPending = true;
                }
            }
        }

        private void invokeRecordListener(ConsumerRecords<K, V> records) {
            if (this.transactionTemplate != null) {
                this.invokeRecordListenerInTx(records);
            } else {
                this.doInvokeWithRecords(records);
            }
        }

        private void invokeRecordListenerInTx(ConsumerRecords<K, V> records) {
            Iterator iterator = records.iterator();
            while (iterator.hasNext() && (!this.stopImmediate || KafkaMessageListenerContainer.this.isRunning())) {
                ConsumerRecord cRecord = this.checkEarlyIntercept((ConsumerRecord)iterator.next());
                if (cRecord == null) continue;
                this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord));
                try {
                    this.invokeInTransaction(iterator, cRecord);
                }
                catch (FencedInstanceIdException | ProducerFencedException e) {
                    this.logger.error(e, (CharSequence)"Producer or 'group.instance.id' fenced during transaction");
                    if (!this.containerProperties.isStopContainerWhenFenced()) break;
                    throw new StopAfterFenceException("Container stopping due to fencing", e);
                }
                catch (RuntimeException ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"Transaction rolled back");
                    this.recordAfterRollback(iterator, cRecord, ex);
                }
                if (this.commonRecordInterceptor != null) {
                    this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
                }
                if (this.nackSleepDurationMillis >= 0L) {
                    this.handleNack(records, cRecord);
                    break;
                }
                if (!this.checkImmediatePause(iterator)) continue;
                break;
            }
        }

        private void invokeInTransaction(final Iterator<ConsumerRecord<K, V>> iterator, final ConsumerRecord<K, V> cRecord) {
            this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                public void doInTransactionWithoutResult(TransactionStatus s) {
                    RuntimeException aborted;
                    if (ListenerConsumer.this.kafkaTxManager != null) {
                        ListenerConsumer.this.producer = ListenerConsumer.this.getTxProducer();
                    }
                    if ((aborted = ListenerConsumer.this.doInvokeRecordListener(cRecord, iterator)) != null) {
                        throw aborted;
                    }
                }
            });
        }

        private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, ConsumerRecord<K, V> cRecord, final RuntimeException e) {
            final ArrayList unprocessed = new ArrayList();
            unprocessed.add(cRecord);
            while (iterator.hasNext()) {
                unprocessed.add(iterator.next());
            }
            final AfterRollbackProcessor afterRollbackProcessorToUse = KafkaMessageListenerContainer.this.getAfterRollbackProcessor();
            if (afterRollbackProcessorToUse.isProcessInTransaction() && this.transactionTemplate != null) {
                this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult(){

                    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, e, true, ListenerConsumer.this.eosMode);
                    }
                });
            } else {
                try {
                    afterRollbackProcessorToUse.process(unprocessed, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, e, true, this.eosMode);
                }
                catch (KafkaException ke) {
                    ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"AfterRollbackProcessor threw exception");
                }
            }
        }

        private void doInvokeWithRecords(ConsumerRecords<K, V> records) {
            Iterator iterator = records.iterator();
            while (iterator.hasNext() && (!this.stopImmediate || KafkaMessageListenerContainer.this.isRunning())) {
                ConsumerRecord cRecord = this.checkEarlyIntercept((ConsumerRecord)iterator.next());
                if (cRecord == null) continue;
                this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord));
                this.doInvokeRecordListener(cRecord, iterator);
                if (this.commonRecordInterceptor != null) {
                    this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
                }
                if (this.nackSleepDurationMillis >= 0L) {
                    this.handleNack(records, cRecord);
                    break;
                }
                if (!this.checkImmediatePause(iterator)) continue;
                break;
            }
        }

        private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
            if (KafkaMessageListenerContainer.this.isPauseRequested() && this.pauseImmediate) {
                LinkedHashMap<TopicPartition, List> remaining = new LinkedHashMap<TopicPartition, List>();
                while (iterator.hasNext()) {
                    ConsumerRecord next = iterator.next();
                    remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()), tp -> new ArrayList()).add(next);
                }
                if (!remaining.isEmpty()) {
                    this.remainingRecords = new ConsumerRecords(remaining);
                    return true;
                }
            }
            return false;
        }

        @Nullable
        private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg) {
            ConsumerRecords next = nextArg;
            if (this.earlyBatchInterceptor != null && (next = this.earlyBatchInterceptor.intercept(next, this.consumer)) == null) {
                this.logger.debug(() -> "BatchInterceptor returned null, skipping: " + String.valueOf(nextArg) + " with " + nextArg.count() + " records");
                try {
                    this.ackBatch(nextArg);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.earlyBatchInterceptor.success(nextArg, this.consumer);
            }
            return next;
        }

        @Nullable
        private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
            this.internalHeaders(recordArg);
            ConsumerRecord cRecord = recordArg;
            if (this.earlyRecordInterceptor != null && (cRecord = this.earlyRecordInterceptor.intercept(cRecord, this.consumer)) == null) {
                this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg));
                this.ackCurrent(recordArg);
                this.earlyRecordInterceptor.success(recordArg, this.consumer);
                this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
            }
            return cRecord;
        }

        private void internalHeaders(ConsumerRecord<K, V> cRecord) {
            if (this.deliveryAttemptAware != null) {
                byte[] buff = new byte[4];
                ByteBuffer bb = ByteBuffer.wrap(buff);
                bb.putInt(this.deliveryAttemptAware.deliveryAttempt(new TopicPartitionOffset(cRecord.topic(), cRecord.partition(), cRecord.offset())));
                cRecord.headers().add((Header)new RecordHeader("kafka_deliveryAttempt", buff));
            }
            if (this.listenerinfo != null) {
                this.listenerInfo(cRecord);
            }
        }

        private void listenerInfo(ConsumerRecord<K, V> cRecord) {
            cRecord.headers().add(this.infoHeader);
        }

        private void handleNack(ConsumerRecords<K, V> records, ConsumerRecord<K, V> cRecord) {
            if (!this.autoCommit && !this.isRecordAck) {
                this.processCommits();
            }
            ArrayList list = new ArrayList();
            for (ConsumerRecord next : records) {
                if (list.isEmpty() && !this.recordsEqual(cRecord, next)) continue;
                list.add(next);
            }
            SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger);
            this.pauseForNackSleep();
        }

        private boolean recordsEqual(ConsumerRecord<K, V> rec1, ConsumerRecord<K, V> rec2) {
            return rec1.topic().equals(rec2.topic()) && rec1.partition() == rec2.partition() && rec1.offset() == rec2.offset();
        }

        private void pauseForNackSleep() {
            if (this.nackSleepDurationMillis > 0L) {
                this.nackWakeTimeMillis = System.currentTimeMillis() + this.nackSleepDurationMillis;
                Set alreadyPaused = this.consumer.paused();
                Collection<TopicPartition> assigned = KafkaMessageListenerContainer.this.getAssignedPartitions();
                if (assigned != null) {
                    this.pausedForNack.addAll(assigned);
                }
                this.pausedForNack.removeAll(alreadyPaused);
                this.logger.debug(() -> "Pausing for nack sleep: " + String.valueOf(this.pausedForNack));
                try {
                    this.consumer.pause(this.pausedForNack);
                    KafkaMessageListenerContainer.this.publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
                }
                catch (IllegalStateException ex) {
                    this.logger.warn(() -> "Could not pause for nack, possible rebalance in process: " + ex.getMessage());
                    HashSet nowPaused = new HashSet(this.consumer.paused());
                    nowPaused.removeAll(alreadyPaused);
                    this.consumer.resume(nowPaused);
                }
            }
            this.nackSleepDurationMillis = -1L;
        }

        private Producer<?, ?> getTxProducer() {
            return ((KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()))).getProducer();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nullable
        private RuntimeException doInvokeRecordListener(ConsumerRecord<K, V> cRecord, Iterator<ConsumerRecord<K, V>> iterator) {
            Object sample = this.startMicrometerSample();
            Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(this.containerProperties.getObservationConvention(), KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(cRecord, KafkaMessageListenerContainer.this.getListenerId(), this.getClientId(), this.consumerGroupId, this::clusterId), this.observationRegistry);
            observation.start();
            Observation.Scope observationScope = observation.openScope();
            try {
                this.invokeOnMessage(cRecord);
                this.successTimer(sample, cRecord);
                this.recordInterceptAfter(cRecord, null);
            }
            catch (RuntimeException e) {
                this.failureTimer(sample, cRecord, e);
                this.recordInterceptAfter(cRecord, e);
                if (!this.isListenerAdapterObservationAware()) {
                    observation.error((Throwable)e);
                }
                if (this.commonErrorHandler == null) {
                    throw e;
                }
                try {
                    this.invokeErrorHandler(cRecord, iterator, e);
                    this.commitOffsetsIfNeededAfterHandlingError(cRecord);
                }
                catch (RecordInRetryException rire) {
                    this.logger.info((CharSequence)"Record in retry and not yet recovered");
                    RecordInRetryException recordInRetryException = rire;
                    return recordInRetryException;
                }
                catch (KafkaException ke) {
                    ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
                    KafkaException kafkaException = ke;
                    return kafkaException;
                }
                catch (RuntimeException ee) {
                    this.logger.error((Throwable)ee, (CharSequence)ERROR_HANDLER_THREW_AN_EXCEPTION);
                    RuntimeException runtimeException = ee;
                    return runtimeException;
                }
                catch (Error er) {
                    this.logger.error((Throwable)er, (CharSequence)"Error handler threw an error");
                    throw er;
                }
            }
            finally {
                if (!this.isListenerAdapterObservationAware()) {
                    observation.stop();
                }
                observationScope.close();
            }
            return null;
        }

        private void commitOffsetsIfNeededAfterHandlingError(ConsumerRecord<K, V> cRecord) {
            if (!((this.autoCommit || !this.commonErrorHandler.isAckAfterHandle() || this.consumerGroupId == null) && this.producer == null || this.remainingRecords != null && cRecord.equals(this.remainingRecords.iterator().next()))) {
                if (this.offsetsInThisBatch != null) {
                    this.ackInOrder(cRecord);
                } else {
                    this.ackCurrent(cRecord, this.isManualAck);
                }
            }
        }

        private void recordInterceptAfter(ConsumerRecord<K, V> records, @Nullable Exception exception) {
            if (this.commonRecordInterceptor != null) {
                try {
                    if (exception == null) {
                        this.commonRecordInterceptor.success(records, this.consumer);
                    } else {
                        this.commonRecordInterceptor.failure(records, exception, this.consumer);
                    }
                }
                catch (Exception e) {
                    this.logger.error((Throwable)e, (CharSequence)"RecordInterceptor.success/failure threw an exception");
                }
            }
        }

        private void invokeOnMessage(ConsumerRecord<K, V> cRecord) {
            Object object = cRecord.value();
            if (object instanceof DeserializationException) {
                DeserializationException ex = (DeserializationException)((Object)object);
                throw ex;
            }
            object = cRecord.key();
            if (object instanceof DeserializationException) {
                DeserializationException ex = (DeserializationException)((Object)object);
                throw ex;
            }
            if (cRecord.value() == null && this.checkNullValueForExceptions) {
                this.checkDeser(cRecord, "springDeserializerExceptionValue");
            }
            if (cRecord.key() == null && this.checkNullKeyForExceptions) {
                this.checkDeser(cRecord, "springDeserializerExceptionKey");
            }
            this.doInvokeOnMessage(cRecord);
            if (this.nackSleepDurationMillis < 0L && !this.isManualImmediateAck) {
                this.ackCurrent(cRecord);
            }
            if (this.isCountAck || this.isTimeOnlyAck) {
                this.doProcessCommits();
            }
        }

        private void doInvokeOnMessage(ConsumerRecord<K, V> recordArg) {
            ConsumerRecord cRecord = recordArg;
            if (this.recordInterceptor != null) {
                cRecord = this.recordInterceptor.intercept(cRecord, this.consumer);
            }
            if (cRecord == null) {
                this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + KafkaUtils.format(recordArg));
            } else {
                try {
                    switch (this.listenerType) {
                        case ACKNOWLEDGING_CONSUMER_AWARE: {
                            this.listener.onMessage(cRecord, this.isAnyManualAck ? new ConsumerAcknowledgment(cRecord) : null, this.consumer);
                            break;
                        }
                        case CONSUMER_AWARE: {
                            this.listener.onMessage(cRecord, this.consumer);
                            break;
                        }
                        case ACKNOWLEDGING: {
                            this.listener.onMessage(cRecord, this.isAnyManualAck ? new ConsumerAcknowledgment(cRecord) : null);
                            break;
                        }
                        case SIMPLE: {
                            this.listener.onMessage(cRecord);
                        }
                    }
                }
                catch (Exception ex) {
                    throw this.decorateException(ex);
                }
            }
        }

        private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedRecordTuple) {
            ConsumerRecord cRecord = failedRecordTuple.record;
            RuntimeException rte = failedRecordTuple.ex;
            if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
                try {
                    if (this.producer == null) {
                        this.processCommits();
                    }
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"Failed to commit before handling error");
                }
                ArrayList records = new ArrayList();
                records.add(cRecord);
                this.commonErrorHandler.handleRemaining(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
            } else {
                boolean handled = false;
                try {
                    handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"ErrorHandler threw unexpected exception");
                }
                LinkedHashMap<TopicPartition, List> records = new LinkedHashMap<TopicPartition, List>();
                if (!handled) {
                    records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()), tp -> new ArrayList()).add(cRecord);
                }
                if (!records.isEmpty()) {
                    this.remainingRecords = new ConsumerRecords(records);
                    this.pauseForPending = true;
                }
            }
        }

        private void invokeErrorHandler(ConsumerRecord<K, V> cRecord, Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
            if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) {
                try {
                    if (this.producer == null) {
                        this.processCommits();
                    }
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"Failed to commit before handling error");
                }
                ArrayList records = new ArrayList();
                records.add(cRecord);
                while (iterator.hasNext()) {
                    records.add(iterator.next());
                }
                this.commonErrorHandler.handleRemaining(rte, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
            } else {
                boolean handled = false;
                try {
                    handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer);
                }
                catch (Exception ex) {
                    this.logger.error((Throwable)ex, (CharSequence)"ErrorHandler threw unexpected exception");
                }
                LinkedHashMap<TopicPartition, List> records = new LinkedHashMap<TopicPartition, List>();
                if (!handled) {
                    records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()), tp -> new ArrayList()).add(cRecord);
                    while (iterator.hasNext()) {
                        ConsumerRecord next = iterator.next();
                        records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()), tp -> new ArrayList()).add(next);
                    }
                }
                if (!records.isEmpty()) {
                    this.remainingRecords = new ConsumerRecords(records);
                    this.pauseForPending = true;
                }
            }
        }

        private RuntimeException decorateException(Exception ex) {
            Object toHandle = ex;
            if (toHandle instanceof ListenerExecutionFailedException) {
                toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId, toHandle.getCause());
                this.fixStackTrace(ex, (Exception)toHandle);
            } else {
                toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, (Throwable)toHandle);
            }
            return (RuntimeException)toHandle;
        }

        private void fixStackTrace(Exception ex, Exception toHandle) {
            try {
                StackTraceElement[] stackTrace2;
                StackTraceElement[] stackTrace = ex.getStackTrace();
                if (stackTrace != null && stackTrace.length > 0 && (stackTrace2 = toHandle.getStackTrace()) != null) {
                    int matching = -1;
                    for (int i = 0; i < stackTrace2.length; ++i) {
                        StackTraceElement se2 = stackTrace[i];
                        for (StackTraceElement se : stackTrace2) {
                            if (!se2.equals(se)) continue;
                            matching = i;
                            break;
                        }
                        if (matching >= 0) break;
                    }
                    if (matching >= 0) {
                        StackTraceElement[] merged = new StackTraceElement[matching];
                        System.arraycopy(stackTrace, 0, merged, 0, matching);
                        ListenerExecutionFailedException suppressed = new ListenerExecutionFailedException("Restored Stack Trace");
                        suppressed.setStackTrace(merged);
                        toHandle.addSuppressed((Throwable)((Object)suppressed));
                    }
                }
            }
            catch (Exception ex2) {
                this.logger.debug((Throwable)ex2, (CharSequence)"Could not restore the stack trace when decorating the LEFE with the group id");
            }
        }

        public void checkDeser(ConsumerRecord<K, V> cRecord, String headerName) {
            DeserializationException exception = SerializationUtils.getExceptionFromHeader(cRecord, headerName, this.logger);
            if (exception != null) {
                throw this.decorateException((Exception)((Object)exception));
            }
        }

        public void ackCurrent(ConsumerRecord<K, V> cRecord) {
            this.ackCurrent(cRecord, false);
        }

        public void ackCurrent(ConsumerRecord<K, V> cRecord, boolean commitRecovered) {
            if (this.isRecordAck && this.producer == null) {
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = this.buildSingleCommits(cRecord);
                this.commitLogger.log(() -> COMMITTING + String.valueOf(offsetsToCommit));
                this.commitOffsets(offsetsToCommit);
            } else if (this.producer != null) {
                this.acks.add(cRecord);
                this.sendOffsetsToTransaction();
            } else if (!(this.autoCommit || this.isAnyManualAck && !commitRecovered)) {
                this.acks.add(cRecord);
            }
        }

        private void sendOffsetsToTransaction() {
            this.handleAcks();
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
            this.commitLogger.log(() -> "Sending offsets to transaction: " + String.valueOf(commits));
            this.doSendOffsets(this.producer, commits);
        }

        private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
            if (CollectionUtils.isEmpty(commits)) {
                return;
            }
            prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
            if (this.fixTxOffsets) {
                this.lastCommits.putAll(commits);
            }
        }

        private void processCommits() {
            this.count += this.acks.size();
            this.handleAcks();
            if (this.isCountAck) {
                this.countAcks();
            } else if (this.isTimeAck) {
                this.timedAcks();
            } else if (!this.isManualImmediateAck) {
                this.commitIfNecessary();
                this.count = 0;
            }
        }

        private void countAcks() {
            boolean countExceeded;
            boolean bl = countExceeded = this.isCountAck && this.count >= this.containerProperties.getAckCount();
            if (countExceeded) {
                this.logger.debug(() -> "Committing in " + this.ackMode.name() + " because count " + this.count + " exceeds configured limit of " + this.containerProperties.getAckCount());
                this.commitIfNecessary();
                this.count = 0;
                if (ContainerProperties.AckMode.COUNT_TIME.equals((Object)this.ackMode)) {
                    this.last = System.currentTimeMillis();
                }
            }
        }

        private void timedAcks() {
            boolean elapsed;
            long now = System.currentTimeMillis();
            boolean bl = elapsed = this.isTimeAck && now - this.last > this.containerProperties.getAckTime();
            if (elapsed) {
                this.logger.debug(() -> "Committing in " + this.ackMode.name() + "because time elapsed exceeds configured limit of " + this.containerProperties.getAckTime());
                this.commitIfNecessary();
                this.last = now;
                if (ContainerProperties.AckMode.COUNT_TIME.equals((Object)this.ackMode)) {
                    this.count = 0;
                }
            }
        }

        private boolean checkPartitionAssignedBeforeSeek(@Nullable Collection<TopicPartition> assigned, TopicPartition topicPartition) {
            if (assigned != null && assigned.contains(topicPartition)) {
                return true;
            }
            this.logger.warn((CharSequence)("No current assignment for partition '" + String.valueOf(topicPartition) + "' due to partition reassignment prior to seeking."));
            return false;
        }

        private void processSeeks() {
            Collection<TopicPartition> assigned = KafkaMessageListenerContainer.this.getAssignedPartitions();
            this.processTimestampSeeks(assigned);
            TopicPartitionOffset offset = (TopicPartitionOffset)this.seeks.poll();
            while (offset != null) {
                this.traceSeek(offset);
                try {
                    TopicPartition topicPartition = offset.getTopicPartition();
                    if (!this.checkPartitionAssignedBeforeSeek(assigned, topicPartition)) {
                        offset = (TopicPartitionOffset)this.seeks.poll();
                        continue;
                    }
                    TopicPartitionOffset.SeekPosition position = offset.getPosition();
                    Long whereTo = offset.getOffset();
                    Function<Long, Long> offsetComputeFunction = offset.getOffsetComputeFunction();
                    if (position == null) {
                        if (offset.isRelativeToCurrent()) {
                            whereTo = whereTo + this.consumer.position(topicPartition);
                            whereTo = Math.max(whereTo, 0L);
                        } else if (offsetComputeFunction != null) {
                            whereTo = offsetComputeFunction.apply(this.consumer.position(topicPartition));
                        }
                        this.consumer.seek(topicPartition, whereTo.longValue());
                    } else if (TopicPartitionOffset.SeekPosition.TIMESTAMP.equals((Object)position)) {
                        Map offsetsForTimes = this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, offset.getOffset()));
                        offsetsForTimes.forEach((tp, ot) -> {
                            if (ot != null) {
                                this.consumer.seek(tp, ot.offset());
                            }
                        });
                    } else {
                        if (TopicPartitionOffset.SeekPosition.BEGINNING.equals((Object)position)) {
                            this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
                        } else {
                            this.consumer.seekToEnd(Collections.singletonList(topicPartition));
                        }
                        if (whereTo != null) {
                            whereTo = whereTo + this.consumer.position(topicPartition);
                            this.consumer.seek(topicPartition, whereTo.longValue());
                        }
                    }
                }
                catch (Exception e) {
                    TopicPartitionOffset offsetToLog = offset;
                    this.logger.error((Throwable)e, () -> "Exception while seeking " + String.valueOf(offsetToLog));
                }
                offset = (TopicPartitionOffset)this.seeks.poll();
            }
        }

        private void processTimestampSeeks(@Nullable Collection<TopicPartition> assigned) {
            Iterator seekIterator = this.seeks.iterator();
            HashMap<TopicPartition, Long> timestampSeeks = null;
            while (seekIterator.hasNext()) {
                TopicPartitionOffset tpo = (TopicPartitionOffset)seekIterator.next();
                if (!this.checkPartitionAssignedBeforeSeek(assigned, tpo.getTopicPartition())) {
                    seekIterator.remove();
                    continue;
                }
                if (!TopicPartitionOffset.SeekPosition.TIMESTAMP.equals((Object)tpo.getPosition())) continue;
                if (timestampSeeks == null) {
                    timestampSeeks = new HashMap<TopicPartition, Long>();
                }
                timestampSeeks.put(tpo.getTopicPartition(), tpo.getOffset());
                seekIterator.remove();
                this.traceSeek(tpo);
            }
            if (timestampSeeks != null) {
                Map offsetsForTimes = this.consumer.offsetsForTimes(timestampSeeks);
                offsetsForTimes.forEach((tp, ot) -> {
                    if (ot != null) {
                        this.consumer.seek(tp, ot.offset());
                    }
                });
            }
        }

        private void traceSeek(TopicPartitionOffset offset) {
            this.logger.trace(() -> "Seek: " + String.valueOf(offset));
        }

        private void initPartitionsIfNeeded() {
            LinkedHashMap<TopicPartition, OffsetMetadata> partitions = new LinkedHashMap<TopicPartition, OffsetMetadata>(this.definedPartitions);
            Set<TopicPartition> beginnings = partitions.entrySet().stream().filter(e -> TopicPartitionOffset.SeekPosition.BEGINNING.equals((Object)((OffsetMetadata)e.getValue()).seekPosition)).map(Map.Entry::getKey).collect(Collectors.toSet());
            beginnings.forEach(partitions::remove);
            Set<TopicPartition> ends = partitions.entrySet().stream().filter(e -> TopicPartitionOffset.SeekPosition.END.equals((Object)((OffsetMetadata)e.getValue()).seekPosition)).map(Map.Entry::getKey).collect(Collectors.toSet());
            ends.forEach(partitions::remove);
            Map<TopicPartition, Long> times = partitions.entrySet().stream().filter(e -> TopicPartitionOffset.SeekPosition.TIMESTAMP.equals((Object)((OffsetMetadata)e.getValue()).seekPosition)).collect(Collectors.toMap(Map.Entry::getKey, entry -> ((OffsetMetadata)entry.getValue()).offset));
            if (!times.isEmpty()) {
                times.forEach((key, value) -> partitions.remove(key));
                Map offsetsForTimes = this.consumer.offsetsForTimes(times);
                offsetsForTimes.forEach((tp, off) -> {
                    if (off == null) {
                        ends.add((TopicPartition)tp);
                    } else {
                        partitions.put((TopicPartition)tp, new OffsetMetadata(off.offset(), false, TopicPartitionOffset.SeekPosition.TIMESTAMP));
                    }
                });
            }
            this.doInitialSeeks(partitions, beginnings, ends);
            if (this.consumerSeekAwareListener != null) {
                this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream().map(tp -> new AbstractMap.SimpleEntry<TopicPartition, Long>((TopicPartition)tp, this.consumer.position(tp))).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)), this.seekCallback);
            }
        }

        private void doInitialSeeks(Map<TopicPartition, OffsetMetadata> partitions, Set<TopicPartition> beginnings, Set<TopicPartition> ends) {
            if (!beginnings.isEmpty()) {
                this.consumer.seekToBeginning(beginnings);
            }
            if (!ends.isEmpty()) {
                this.consumer.seekToEnd(ends);
            }
            for (Map.Entry<TopicPartition, OffsetMetadata> entry : partitions.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                OffsetMetadata metadata = entry.getValue();
                Long offset = metadata.offset;
                if (offset == null) continue;
                long newOffset = offset;
                if (offset < 0L) {
                    if (!metadata.relativeToCurrent) {
                        this.consumer.seekToEnd(Collections.singletonList(topicPartition));
                    }
                    newOffset = Math.max(0L, this.consumer.position(topicPartition) + offset);
                } else if (metadata.relativeToCurrent) {
                    newOffset = this.consumer.position(topicPartition) + offset;
                }
                try {
                    this.consumer.seek(topicPartition, newOffset);
                    this.logReset(topicPartition, newOffset);
                }
                catch (Exception e) {
                    long newOffsetToLog = newOffset;
                    this.logger.error((Throwable)e, () -> "Failed to set initial offset for " + String.valueOf(topicPartition) + " at " + newOffsetToLog + ". Position is " + this.consumer.position(topicPartition));
                }
            }
        }

        private void logReset(TopicPartition topicPartition, long newOffset) {
            this.logger.debug(() -> "Reset " + String.valueOf(topicPartition) + " to offset " + newOffset);
        }

        private void addOffset(ConsumerRecord<K, V> cRecord) {
            this.offsets.compute(new TopicPartition(cRecord.topic(), cRecord.partition()), (k, v) -> v == null ? cRecord.offset() : Math.max(v, cRecord.offset()));
        }

        private void commitIfNecessary() {
            Map<TopicPartition, OffsetAndMetadata> commits = this.buildCommits();
            this.logger.debug(() -> "Commit list: " + String.valueOf(commits));
            if (!commits.isEmpty()) {
                this.commitLogger.log(() -> COMMITTING + String.valueOf(commits));
                try {
                    this.commitOffsets(commits);
                }
                catch (WakeupException e) {
                    this.logger.debug((CharSequence)"Woken up during commit");
                }
            }
        }

        private void commitOffsetsInTransactions(Map<TopicPartition, OffsetAndMetadata> commits) {
            this.commitLogger.log(() -> COMMITTING + String.valueOf(commits));
            if (this.producer != null) {
                this.doSendOffsets(this.producer, commits);
            } else {
                this.commitOffsets(commits);
            }
        }

        private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> commits) {
            if (CollectionUtils.isEmpty(commits)) {
                return;
            }
            if (this.syncCommits) {
                this.commitSync(commits);
            } else {
                this.commitAsync(commits);
            }
        }

        private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits) {
            this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
                this.commitCallback.onComplete(offsetsAttempted, exception);
                if (exception == null && this.fixTxOffsets) {
                    this.lastCommits.putAll(commits);
                }
            });
        }

        private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
            this.doCommitSync(commits, 0);
        }

        private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
            try {
                this.consumer.commitSync(commits, this.syncCommitTimeout);
                if (this.fixTxOffsets) {
                    this.lastCommits.putAll(commits);
                }
                if (!this.commitsDuringRebalance.isEmpty()) {
                    this.commitsDuringRebalance.keySet().removeAll(commits.keySet());
                }
            }
            catch (RetriableCommitFailedException e) {
                if (retries >= this.containerProperties.getCommitRetries()) {
                    throw e;
                }
                this.doCommitSync(commits, retries + 1);
            }
            catch (RebalanceInProgressException e) {
                this.logger.debug((Throwable)e, (CharSequence)"Non-fatal commit failure");
                this.commitsDuringRebalance.putAll(commits);
            }
        }

        Map<TopicPartition, OffsetAndMetadata> buildSingleCommits(ConsumerRecord<K, V> cRecord) {
            return Collections.singletonMap(new TopicPartition(cRecord.topic(), cRecord.partition()), this.createOffsetAndMetadata(cRecord.offset() + 1L));
        }

        private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
            LinkedHashMap<TopicPartition, OffsetAndMetadata> commits = new LinkedHashMap<TopicPartition, OffsetAndMetadata>();
            this.offsets.forEach((topicPartition, offset) -> commits.put((TopicPartition)topicPartition, this.createOffsetAndMetadata(offset + 1L)));
            this.offsets.clear();
            return commits;
        }

        private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords<K, V> records) {
            return records.partitions().stream().collect(Collectors.toMap(tp -> tp, tp -> {
                List recordList = records.records(tp);
                return (ConsumerRecord)recordList.get(recordList.size() - 1);
            })).values();
        }

        private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
            this.failedRecords.addLast(new FailedRecordTuple(cRecord, ex));
        }

        @Override
        public void seek(String topic, int partition, long offset) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, offset));
        }

        @Override
        public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, offsetComputeFunction));
        }

        @Override
        public void seekToBeginning(String topic, int partition) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, TopicPartitionOffset.SeekPosition.BEGINNING));
        }

        @Override
        public void seekToBeginning(Collection<TopicPartition> partitions) {
            this.seeks.addAll(partitions.stream().map(tp -> new TopicPartitionOffset(tp.topic(), tp.partition(), TopicPartitionOffset.SeekPosition.BEGINNING)).toList());
        }

        @Override
        public void seekToEnd(String topic, int partition) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, TopicPartitionOffset.SeekPosition.END));
        }

        @Override
        public void seekToEnd(Collection<TopicPartition> partitions) {
            this.seeks.addAll(partitions.stream().map(tp -> new TopicPartitionOffset(tp.topic(), tp.partition(), TopicPartitionOffset.SeekPosition.END)).toList());
        }

        @Override
        public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
            if (toCurrent) {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, true));
            } else if (offset >= 0L) {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, TopicPartitionOffset.SeekPosition.BEGINNING));
            } else {
                this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)offset, TopicPartitionOffset.SeekPosition.END));
            }
        }

        @Override
        public void seekToTimestamp(String topic, int partition, long timestamp) {
            this.seeks.add(new TopicPartitionOffset(topic, partition, (Long)timestamp, TopicPartitionOffset.SeekPosition.TIMESTAMP));
        }

        @Override
        public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestamp) {
            topicParts.forEach(tp -> this.seekToTimestamp(tp.topic(), tp.partition(), timestamp));
        }

        @Override
        public String getGroupId() {
            return this.consumerGroupId;
        }

        public String toString() {
            return "KafkaMessageListenerContainer.ListenerConsumer [\ncontainerProperties=" + String.valueOf(this.containerProperties) + "\nother properties [\n listenerType=" + String.valueOf((Object)this.listenerType) + "\n isConsumerAwareListener=" + this.isConsumerAwareListener + "\n isBatchListener=" + this.isBatchListener + "\n autoCommit=" + this.autoCommit + "\n consumerGroupId=" + this.consumerGroupId + "\n clientIdSuffix=" + KafkaMessageListenerContainer.this.clientIdSuffix + "\n]";
        }

        private OffsetAndMetadata createOffsetAndMetadata(long offset) {
            return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
        }

        private final class InitialOrIdleSeekCallback
        implements ConsumerSeekAware.ConsumerSeekCallback {
            InitialOrIdleSeekCallback() {
            }

            @Override
            public void seek(String topic, int partition, long offset) {
                ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offset);
            }

            @Override
            public void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction) {
                ListenerConsumer.this.consumer.seek(new TopicPartition(topic, partition), offsetComputeFunction.apply(ListenerConsumer.this.consumer.position(new TopicPartition(topic, partition))).longValue());
            }

            @Override
            public void seekToBeginning(String topic, int partition) {
                ListenerConsumer.this.consumer.seekToBeginning(Collections.singletonList(new TopicPartition(topic, partition)));
            }

            @Override
            public void seekToBeginning(Collection<TopicPartition> partitions) {
                ListenerConsumer.this.consumer.seekToBeginning(partitions);
            }

            @Override
            public void seekToEnd(String topic, int partition) {
                ListenerConsumer.this.consumer.seekToEnd(Collections.singletonList(new TopicPartition(topic, partition)));
            }

            @Override
            public void seekToEnd(Collection<TopicPartition> partitions) {
                ListenerConsumer.this.consumer.seekToEnd(partitions);
            }

            @Override
            public void seekRelative(String topic, int partition, long offset, boolean toCurrent) {
                TopicPartition topicPart = new TopicPartition(topic, partition);
                Long whereTo = null;
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                whereTo = offset >= 0L ? this.computeForwardWhereTo(offset, toCurrent, topicPart, consumerToSeek) : this.computeBackwardWhereTo(offset, toCurrent, topicPart, consumerToSeek);
                if (whereTo != null) {
                    consumerToSeek.seek(topicPart, whereTo.longValue());
                }
            }

            @Override
            public void seekToTimestamp(String topic, int partition, long timestamp) {
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                Map offsetsForTimes = consumerToSeek.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic, partition), timestamp));
                offsetsForTimes.forEach((tp, ot) -> {
                    if (ot != null) {
                        consumerToSeek.seek(tp, ot.offset());
                    }
                });
            }

            @Override
            public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestamp) {
                Consumer consumerToSeek = ListenerConsumer.this.consumer;
                Map<TopicPartition, Long> map = topicParts.stream().collect(Collectors.toMap(tp -> tp, tp -> timestamp));
                Map offsetsForTimes = consumerToSeek.offsetsForTimes(map);
                offsetsForTimes.forEach((tp, ot) -> {
                    if (ot != null) {
                        consumerToSeek.seek(tp, ot.offset());
                    }
                });
            }

            @Nullable
            private Long computeForwardWhereTo(long offset, boolean toCurrent, TopicPartition topicPart, Consumer<K, V> consumerToSeek) {
                Long start;
                if (!toCurrent) {
                    Map beginning = consumerToSeek.beginningOffsets(Collections.singletonList(topicPart));
                    start = (Long)beginning.get(topicPart);
                } else {
                    start = consumerToSeek.position(topicPart);
                }
                if (start != null) {
                    return start + offset;
                }
                return null;
            }

            @Nullable
            private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartition topicPart, Consumer<K, V> consumerToSeek) {
                Long end;
                if (!toCurrent) {
                    Map endings = consumerToSeek.endOffsets(Collections.singletonList(topicPart));
                    end = (Long)endings.get(topicPart);
                } else {
                    end = consumerToSeek.position(topicPart);
                }
                if (end != null) {
                    long newOffset = end + offset;
                    return newOffset < 0L ? 0L : newOffset;
                }
                return null;
            }
        }

        private class ListenerConsumerRebalanceListener
        implements ConsumerRebalanceListener {
            private final ConsumerRebalanceListener userListener;
            private final ConsumerAwareRebalanceListener consumerAwareListener;
            private final Collection<TopicPartition> revoked;

            ListenerConsumerRebalanceListener() {
                ConsumerAwareRebalanceListener carl;
                ConsumerRebalanceListener consumerRebalanceListener = this.userListener = KafkaMessageListenerContainer.this.getContainerProperties().getConsumerRebalanceListener();
                this.consumerAwareListener = consumerRebalanceListener instanceof ConsumerAwareRebalanceListener ? (carl = (ConsumerAwareRebalanceListener)consumerRebalanceListener) : null;
                this.revoked = new LinkedList<TopicPartition>();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                this.revoked.addAll(partitions);
                this.removeRevocationsFromPending(partitions);
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer, partitions);
                } else {
                    this.userListener.onPartitionsRevoked(partitions);
                }
                try {
                    ListenerConsumer.this.checkRebalanceCommits();
                    ListenerConsumer.this.commitPendingAcks();
                    ListenerConsumer.this.fixTxOffsetsIfNeeded();
                }
                catch (Exception e) {
                    ListenerConsumer.this.logger.error((Throwable)e, () -> "Fatal commit error after revocation " + String.valueOf(partitions));
                }
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer, partitions);
                }
                if (ListenerConsumer.this.consumerSeekAwareListener != null) {
                    ListenerConsumer.this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
                }
                if (ListenerConsumer.this.assignedPartitions != null) {
                    ListenerConsumer.this.assignedPartitions.removeAll(partitions);
                }
                ListenerConsumer.this.pausedForNack.removeAll(partitions);
                partitions.forEach(ListenerConsumer.this.lastCommits::remove);
                ListenerConsumer listenerConsumer = ListenerConsumer.this;
                synchronized (listenerConsumer) {
                    Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
                    if (pendingOffsets != null) {
                        partitions.forEach(tp -> {
                            pendingOffsets.remove(tp);
                            ListenerConsumer.this.deferredOffsets.remove(tp);
                        });
                        if (pendingOffsets.isEmpty()) {
                            ListenerConsumer.this.consumerPaused = false;
                        }
                    }
                }
            }

            private void removeRevocationsFromPending(Collection<TopicPartition> partitions) {
                ConsumerRecords remaining = ListenerConsumer.this.remainingRecords;
                if (remaining != null && !partitions.isEmpty()) {
                    LinkedHashSet remainingParts = new LinkedHashSet(remaining.partitions());
                    remainingParts.removeAll(partitions);
                    if (!remainingParts.isEmpty()) {
                        LinkedHashMap trimmed = new LinkedHashMap();
                        remainingParts.forEach(part -> trimmed.computeIfAbsent(part, tp -> remaining.records(tp)));
                        ListenerConsumer.this.remainingRecords = new ConsumerRecords(trimmed);
                    } else {
                        ListenerConsumer.this.remainingRecords = null;
                    }
                    ListenerConsumer.this.logger.debug(() -> "Removed " + String.valueOf(partitions) + " from remaining records");
                }
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                this.repauseIfNeeded(partitions);
                ListenerConsumer.this.assignedPartitions.addAll(partitions);
                if (ListenerConsumer.this.commitCurrentOnAssignment && !this.collectAndCommitIfNecessary(partitions)) {
                    return;
                }
                if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
                    ListenerConsumer.this.seekPartitions(partitions, false);
                }
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsAssigned(ListenerConsumer.this.consumer, partitions);
                } else {
                    this.userListener.onPartitionsAssigned(partitions);
                }
                if (!ListenerConsumer.this.firstPoll && ListenerConsumer.this.definedPartitions == null && ListenerConsumer.this.consumerSeekAwareListener != null) {
                    ListenerConsumer.this.firstPoll = true;
                    ListenerConsumer.this.consumerSeekAwareListener.onFirstPoll();
                }
                if (ListenerConsumer.this.commonErrorHandler != null) {
                    ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer, partitions, () -> KafkaMessageListenerContainer.this.publishConsumerPausedEvent(partitions, "Paused by error handler after rebalance"));
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void repauseIfNeeded(Collection<TopicPartition> partitions) {
                boolean pending = false;
                ListenerConsumer listenerConsumer = ListenerConsumer.this;
                synchronized (listenerConsumer) {
                    if (!ObjectUtils.isEmpty(ListenerConsumer.this.offsetsInThisBatch)) {
                        pending = true;
                    }
                }
                if ((pending || KafkaMessageListenerContainer.this.isPauseRequested() || ListenerConsumer.this.remainingRecords != null) && !partitions.isEmpty()) {
                    ListenerConsumer.this.consumer.pause(partitions);
                    ListenerConsumer.this.consumerPaused = true;
                    ListenerConsumer.this.logger.warn((CharSequence)"Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records");
                    ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + String.valueOf(partitions));
                    KafkaMessageListenerContainer.this.publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
                }
                LinkedList<TopicPartition> toRepause = new LinkedList<TopicPartition>();
                partitions.forEach(tp -> {
                    if (KafkaMessageListenerContainer.this.isPartitionPauseRequested((TopicPartition)tp)) {
                        toRepause.add((TopicPartition)tp);
                    }
                });
                if (!ListenerConsumer.this.consumerPaused && !toRepause.isEmpty()) {
                    ListenerConsumer.this.consumer.pause(toRepause);
                    ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + String.valueOf(toRepause));
                    KafkaMessageListenerContainer.this.publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
                    ListenerConsumer.this.pausedPartitions.addAll(toRepause);
                }
                this.revoked.removeAll(toRepause);
                ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
                this.revoked.clear();
                if (!ListenerConsumer.this.pausedForNack.isEmpty()) {
                    ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
                }
            }

            private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
                HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
                Map committed = ListenerConsumer.this.consumer.committed(new HashSet<TopicPartition>(partitions));
                for (TopicPartition partition : partitions) {
                    try {
                        if (committed.get(partition) != null) continue;
                        offsetsToCommit.put(partition, ListenerConsumer.this.createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
                    }
                    catch (NoOffsetForPartitionException e) {
                        ListenerConsumer.this.fatalError = true;
                        ListenerConsumer.this.logger.error((Throwable)e, (CharSequence)"No offset and no reset policy");
                        return false;
                    }
                }
                if (!offsetsToCommit.isEmpty()) {
                    this.commitCurrentOffsets(offsetsToCommit);
                }
                return true;
            }

            private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
                ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + String.valueOf(offsetsToCommit));
                if (ListenerConsumer.this.transactionTemplate != null && ListenerConsumer.this.kafkaTxManager != null && !ContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.equals((Object)ListenerConsumer.this.autoCommitOption)) {
                    offsetsToCommit.forEach((partition, offsetAndMetadata) -> ListenerConsumer.this.transactionTemplate.execute((TransactionCallback)new TransactionCallbackWithoutResult((TopicPartition)partition, (OffsetAndMetadata)offsetAndMetadata){
                        final /* synthetic */ TopicPartition val$partition;
                        final /* synthetic */ OffsetAndMetadata val$offsetAndMetadata;
                        {
                            this.val$partition = topicPartition;
                            this.val$offsetAndMetadata = offsetAndMetadata;
                        }

                        protected void doInTransactionWithoutResult(TransactionStatus status) {
                            KafkaResourceHolder holder = (KafkaResourceHolder)((Object)TransactionSynchronizationManager.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()));
                            if (holder != null) {
                                ListenerConsumer.this.doSendOffsets(holder.getProducer(), Collections.singletonMap(this.val$partition, this.val$offsetAndMetadata));
                            }
                        }
                    }));
                } else {
                    ContainerProperties containerProps = KafkaMessageListenerContainer.this.getContainerProperties();
                    if (containerProps.isSyncCommits()) {
                        try {
                            ListenerConsumer.this.consumer.commitSync(offsetsToCommit, containerProps.getSyncCommitTimeout());
                        }
                        catch (RetriableCommitFailedException | RebalanceInProgressException throwable) {}
                    } else {
                        ListenerConsumer.this.commitAsync(offsetsToCommit);
                    }
                }
            }

            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                if (this.consumerAwareListener != null) {
                    this.consumerAwareListener.onPartitionsLost(ListenerConsumer.this.consumer, partitions);
                } else {
                    this.userListener.onPartitionsLost(partitions);
                }
                this.onPartitionsRevoked(partitions);
            }
        }

        private final class ConsumerBatchAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecords<K, V> records;
            private final List<ConsumerRecord<K, V>> recordList;
            private volatile boolean acked;
            private volatile int partial = -1;

            ConsumerBatchAcknowledgment(@Nullable ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList) {
                this.records = records;
                this.recordList = recordList;
            }

            @Override
            public void acknowledge() {
                if (this.partial >= 0) {
                    this.acknowledge(this.partial + 1);
                    return;
                }
                if (!this.acked) {
                    Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
                    Map deferred = ListenerConsumer.this.deferredOffsets;
                    for (TopicPartition topicPartition : this.records.partitions()) {
                        if (offs == null) continue;
                        offs.remove(topicPartition);
                        deferred.remove(topicPartition);
                    }
                    ListenerConsumer.this.processAcks(this.records);
                    this.acked = true;
                }
            }

            @Override
            public void acknowledge(int index) {
                Assert.isTrue((index > this.partial ? 1 : 0) != 0, () -> String.format("index (%d) must be greater than the previous partial commit (%d)", index, this.partial));
                Assert.state((boolean)ListenerConsumer.this.isManualImmediateAck, (String)"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
                Assert.state((this.recordList != null ? 1 : 0) != 0, (String)"Listener must receive a List of records to use partial batch acknowledgment");
                Assert.isTrue((index >= 0 && index < this.recordList.size() ? 1 : 0) != 0, () -> String.format("index (%d) is out of range (%d-%d)", index, 0, this.recordList.size() - 1));
                Assert.state((boolean)Thread.currentThread().equals(ListenerConsumer.this.consumerThread), (String)"Partial batch acknowledgment is only supported on the consumer thread");
                LinkedHashMap<TopicPartition, List> offsetsToCommit = new LinkedHashMap<TopicPartition, List>();
                for (int i = this.partial + 1; i <= index; ++i) {
                    ConsumerRecord record = this.recordList.get(i);
                    offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), tp -> new ArrayList()).add(record);
                }
                if (!offsetsToCommit.isEmpty()) {
                    ListenerConsumer.this.processAcks(new ConsumerRecords(offsetsToCommit));
                }
                this.partial = index;
            }

            @Override
            public void nack(int index, Duration sleep) {
                Assert.state((boolean)Thread.currentThread().equals(ListenerConsumer.this.consumerThread), (String)"nack() can only be called on the consumer thread");
                Assert.state((!ListenerConsumer.this.asyncReplies ? 1 : 0) != 0, (String)"nack() is not supported with out-of-order commits");
                Assert.isTrue((!sleep.isNegative() ? 1 : 0) != 0, (String)"sleep cannot be negative");
                Assert.isTrue((index >= 0 && index < this.records.count() ? 1 : 0) != 0, (String)"index out of bounds");
                ListenerConsumer.this.nackIndex = index;
                ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
                int i = 0;
                LinkedList<ConsumerRecord> toAck = new LinkedList<ConsumerRecord>();
                for (ConsumerRecord cRecord : this.records) {
                    if (i++ >= index) break;
                    toAck.add(cRecord);
                }
                HashMap<TopicPartition, List> newRecords = new HashMap<TopicPartition, List>();
                for (ConsumerRecord cRecord : toAck) {
                    newRecords.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()), tp -> new LinkedList()).add(cRecord);
                }
                ListenerConsumer.this.processAcks(new ConsumerRecords(newRecords));
            }

            @Override
            public boolean isOutOfOrderCommit() {
                return ListenerConsumer.this.asyncReplies;
            }

            public String toString() {
                return "Acknowledgment for " + String.valueOf(this.records);
            }
        }

        private final class ConsumerAcknowledgment
        implements Acknowledgment {
            private final ConsumerRecord<K, V> cRecord;
            private volatile boolean acked;

            ConsumerAcknowledgment(ConsumerRecord<K, V> cRecord) {
                this.cRecord = cRecord;
            }

            @Override
            public void acknowledge() {
                if (!this.acked) {
                    ListenerConsumer.this.doAck(this.cRecord);
                    this.acked = true;
                }
            }

            @Override
            public void nack(Duration sleep) {
                Assert.state((boolean)Thread.currentThread().equals(ListenerConsumer.this.consumerThread), (String)"nack() can only be called on the consumer thread");
                Assert.state((!ListenerConsumer.this.asyncReplies ? 1 : 0) != 0, (String)"nack() is not supported with out-of-order commits");
                Assert.isTrue((!sleep.isNegative() ? 1 : 0) != 0, (String)"sleep cannot be negative");
                ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
            }

            @Override
            public boolean isOutOfOrderCommit() {
                return ListenerConsumer.this.asyncReplies;
            }

            public String toString() {
                return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
            }
        }
    }

    private class StopCallback
    implements BiConsumer<Object, Throwable> {
        private final Runnable callback;

        StopCallback(Runnable callback) {
            this.callback = callback;
        }

        @Override
        public void accept(Object result, @Nullable Throwable throwable) {
            if (throwable != null) {
                KafkaMessageListenerContainer.this.logger.error(throwable, (CharSequence)"Error while stopping the container");
            } else {
                KafkaMessageListenerContainer.this.logger.debug(() -> String.valueOf(KafkaMessageListenerContainer.this) + " stopped normally");
            }
            if (this.callback != null) {
                this.callback.run();
            }
        }
    }

    private static class StopAfterFenceException
    extends KafkaException {
        StopAfterFenceException(String message, Throwable t) {
            super(message, t);
        }
    }

    private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) {
    }

    private record OffsetMetadata(Long offset, boolean relativeToCurrent, TopicPartitionOffset.SeekPosition seekPosition) {
    }
}

