/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.WakeupTrigger;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class AsyncKafkaConsumer<K, V>
implements ConsumerDelegate<K, V> {
    private static final long NO_CURRENT_THREAD = -1L;
    private final ApplicationEventHandler applicationEventHandler;
    private final Time time;
    private final AtomicReference<Optional<ConsumerGroupMetadata>> groupMetadata = new AtomicReference(Optional.empty());
    private final KafkaConsumerMetrics kafkaConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final BackgroundEventProcessor backgroundEventProcessor;
    private final CompletableEventReaper backgroundEventReaper;
    private final Deserializers<K, V> deserializers;
    private final FetchBuffer fetchBuffer;
    private final FetchCollector<K, V> fetchCollector;
    private final ConsumerInterceptors<K, V> interceptors;
    private final IsolationLevel isolationLevel;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private int metadataVersionSnapshot;
    private final Metrics metrics;
    private final long retryBackoffMs;
    private final int defaultApiTimeoutMs;
    private final boolean autoCommitEnabled;
    private volatile boolean closed = false;
    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
    private boolean cachedSubscriptionHasAllFetchPositions;
    private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
    private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final AtomicBoolean asyncCommitFenced;
    private CompletableFuture<Void> lastPendingAsyncCommit = null;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refCount = new AtomicInteger(0);
    private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;

    AsyncKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(config, keyDeserializer, valueDeserializer, Time.SYSTEM, ApplicationEventHandler::new, CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, new LinkedBlockingQueue<BackgroundEvent>());
    }

    AsyncKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Time time, ApplicationEventHandlerFactory applicationEventHandlerFactory, CompletableEventReaperFactory backgroundEventReaperFactory, FetchCollectorFactory<K, V> fetchCollectorFactory, ConsumerMetadataFactory metadataFactory, LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
            this.clientId = config.getString("client.id");
            this.autoCommitEnabled = config.getBoolean("enable.auto.commit");
            LogContext logContext = ConsumerUtils.createLogContext(config, groupRebalanceConfig);
            this.backgroundEventQueue = backgroundEventQueue;
            this.log = logContext.logger(this.getClass());
            this.log.debug("Initializing the Kafka consumer");
            this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
            this.time = time;
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, (AbstractConfig)config);
            this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(this.clientId, config);
            this.clientTelemetryReporter.ifPresent(reporters::add);
            this.metrics = ConsumerUtils.createMetrics(config, time, reporters);
            this.retryBackoffMs = config.getLong("retry.backoff.ms");
            List interceptorList = ConsumerUtils.configuredConsumerInterceptors(config);
            this.interceptors = new ConsumerInterceptors(interceptorList);
            this.deserializers = new Deserializers<K, V>(config, keyDeserializer, valueDeserializer);
            this.subscriptions = ConsumerUtils.createSubscriptionState(config, logContext);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(this.metrics.reporters(), interceptorList, Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
            this.metadata = metadataFactory.build(config, this.subscriptions, logContext, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
            this.metadata.bootstrap(addresses);
            this.metadataVersionSnapshot = this.metadata.updateVersion();
            FetchMetricsManager fetchMetricsManager = ConsumerUtils.createFetchMetricsManager(this.metrics);
            FetchConfig fetchConfig = new FetchConfig(config);
            this.isolationLevel = fetchConfig.isolationLevel;
            ApiVersions apiVersions = new ApiVersions();
            LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
            BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);
            this.fetchBuffer = new FetchBuffer(logContext);
            Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time, logContext, this.metadata, config, apiVersions, this.metrics, fetchMetricsManager.throttleTimeSensor(), this.clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler);
            this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
            this.asyncCommitFenced = new AtomicBoolean(false);
            this.groupMetadata.set(this.initializeGroupMetadata(config, groupRebalanceConfig));
            Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, backgroundEventHandler, this.metadata, this.subscriptions, this.fetchBuffer, config, groupRebalanceConfig, apiVersions, fetchMetricsManager, networkClientDelegateSupplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this::updateGroupMetadata);
            Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, this.metadata, this.subscriptions, requestManagersSupplier);
            this.applicationEventHandler = applicationEventHandlerFactory.build(logContext, time, applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier);
            ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(logContext, this.subscriptions, time, new RebalanceCallbackMetricsManager(this.metrics));
            this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker);
            this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
            this.fetchCollector = fetchCollectorFactory.build(logContext, this.metadata, this.subscriptions, fetchConfig, this.deserializers, fetchMetricsManager, time);
            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, "consumer");
            if (this.groupMetadata.get().isPresent() && GroupProtocol.of(config.getString("group.protocol")) == GroupProtocol.CONSUMER) {
                config.ignore("group.remote.assignor");
            }
            config.logUnused();
            AppInfoParser.registerAppInfo("kafka.consumer", this.clientId, this.metrics, time.milliseconds(), ConfigUtils.getDoLog(config));
            this.log.debug("Kafka consumer initialized");
        }
        catch (Throwable t2) {
            if (this.log != null) {
                this.close(Duration.ZERO, true);
            }
            throw new KafkaException("Failed to construct kafka consumer", t2);
        }
    }

    AsyncKafkaConsumer(LogContext logContext, String clientId, Deserializers<K, V> deserializers, FetchBuffer fetchBuffer, FetchCollector<K, V> fetchCollector, ConsumerInterceptors<K, V> interceptors, Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue<BackgroundEvent> backgroundEventQueue, CompletableEventReaper backgroundEventReaper, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, Metrics metrics, SubscriptionState subscriptions, ConsumerMetadata metadata, long retryBackoffMs, int defaultApiTimeoutMs, String groupId, boolean autoCommitEnabled) {
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptions;
        this.clientId = clientId;
        this.fetchBuffer = fetchBuffer;
        this.fetchCollector = fetchCollector;
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = Objects.requireNonNull(interceptors);
        this.time = time;
        this.backgroundEventQueue = backgroundEventQueue;
        this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker);
        this.backgroundEventReaper = backgroundEventReaper;
        this.metrics = metrics;
        this.groupMetadata.set(this.initializeGroupMetadata(groupId, Optional.empty()));
        this.metadata = metadata;
        this.metadataVersionSnapshot = metadata.updateVersion();
        this.retryBackoffMs = retryBackoffMs;
        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
        this.deserializers = deserializers;
        this.applicationEventHandler = applicationEventHandler;
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
        this.clientTelemetryReporter = Optional.empty();
        this.autoCommitEnabled = autoCommitEnabled;
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
        this.asyncCommitFenced = new AtomicBoolean(false);
    }

    AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, KafkaClient client, SubscriptionState subscriptions, ConsumerMetadata metadata) {
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptions;
        this.clientId = config.getString("client.id");
        this.autoCommitEnabled = config.getBoolean("enable.auto.commit");
        this.fetchBuffer = new FetchBuffer(logContext);
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = new ConsumerInterceptors(Collections.emptyList());
        this.time = time;
        this.metrics = new Metrics(time);
        this.metadata = metadata;
        this.metadataVersionSnapshot = metadata.updateVersion();
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
        this.deserializers = new Deserializers<K, V>(keyDeserializer, valueDeserializer);
        this.clientTelemetryReporter = Optional.empty();
        ConsumerMetrics metricsRegistry = new ConsumerMetrics("consumer");
        FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(this.metrics, metricsRegistry.fetcherMetrics);
        this.fetchCollector = new FetchCollector<K, V>(logContext, metadata, subscriptions, new FetchConfig(config), this.deserializers, fetchMetricsManager, time);
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, "consumer");
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
        this.groupMetadata.set(this.initializeGroupMetadata(config, groupRebalanceConfig));
        LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventQueue = new LinkedBlockingQueue<BackgroundEvent>();
        BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(this.backgroundEventQueue);
        ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(logContext, subscriptions, time, new RebalanceCallbackMetricsManager(this.metrics));
        ApiVersions apiVersions = new ApiVersions();
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler);
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
        this.asyncCommitFenced = new AtomicBoolean(false);
        Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, backgroundEventHandler, metadata, subscriptions, this.fetchBuffer, config, groupRebalanceConfig, apiVersions, fetchMetricsManager, networkClientDelegateSupplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this::updateGroupMetadata);
        Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, requestManagersSupplier);
        this.applicationEventHandler = new ApplicationEventHandler(logContext, time, applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier);
        this.backgroundEventProcessor = new BackgroundEventProcessor(rebalanceListenerInvoker);
        this.backgroundEventReaper = new CompletableEventReaper(logContext);
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(ConsumerConfig config, GroupRebalanceConfig groupRebalanceConfig) {
        Optional<ConsumerGroupMetadata> groupMetadata = this.initializeGroupMetadata(groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId);
        if (!groupMetadata.isPresent()) {
            config.ignore("auto.commit.interval.ms");
            config.ignore("internal.throw.on.fetch.stable.offset.unsupported");
        }
        return groupMetadata;
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(String groupId, Optional<String> groupInstanceId) {
        if (groupId != null) {
            if (groupId.isEmpty()) {
                throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace.");
            }
            return Optional.of(this.initializeConsumerGroupMetadata(groupId, groupInstanceId));
        }
        return Optional.empty();
    }

    private ConsumerGroupMetadata initializeConsumerGroupMetadata(String groupId, Optional<String> groupInstanceId) {
        return new ConsumerGroupMetadata(groupId, -1, "", groupInstanceId);
    }

    private void updateGroupMetadata(Optional<Integer> memberEpoch, Optional<String> memberId) {
        this.groupMetadata.updateAndGet(oldGroupMetadataOptional -> oldGroupMetadataOptional.map(oldGroupMetadata -> new ConsumerGroupMetadata(oldGroupMetadata.groupId(), memberEpoch.orElse(oldGroupMetadata.generationId()), memberId.orElse(oldGroupMetadata.memberId()), oldGroupMetadata.groupInstanceId())));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        Timer timer = this.time.timer(timeout);
        this.acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                this.applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
                this.wakeupTrigger.maybeTriggerWakeup();
                this.updateAssignmentMetadataIfNeeded(timer);
                Fetch<K, V> fetch = this.pollForFetches(timer);
                if (fetch.isEmpty()) continue;
                if (fetch.records().isEmpty()) {
                    this.log.trace("Returning empty records from `poll()` since the consumer's position has advanced for at least one topic partition");
                }
                ConsumerRecords<K, V> consumerRecords = this.interceptors.onConsume(new ConsumerRecords<K, V>(fetch.records()));
                return consumerRecords;
            } while (timer.notExpired());
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            this.release();
        }
    }

    @Override
    public void commitSync() {
        this.commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this.commitAsync(this.subscriptions.allConsumed(), callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.acquireAndEnsureOpen();
        try {
            AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
            this.lastPendingAsyncCommit = this.commit(asyncCommitEvent).whenComplete((r, t2) -> {
                if (t2 == null) {
                    this.offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
                }
                if (t2 instanceof FencedInstanceIdException) {
                    this.asyncCommitFenced.set(true);
                }
                if (callback == null) {
                    if (t2 != null) {
                        this.log.error("Offset commit with offsets {} failed", (Object)offsets, t2);
                    }
                    return;
                }
                this.offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(callback, offsets, (Exception)t2);
            });
        }
        finally {
            this.release();
        }
    }

    private CompletableFuture<Void> commit(CommitEvent commitEvent) {
        this.maybeThrowInvalidGroupIdException();
        this.maybeThrowFencedInstanceException();
        this.offsetCommitCallbackInvoker.executeCallbacks();
        Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
        this.log.debug("Committing offsets: {}", offsets);
        offsets.forEach(this::updateLastSeenEpochIfNewer);
        if (offsets.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.applicationEventHandler.add(commitEvent);
        return commitEvent.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        this.acquireAndEnsureOpen();
        try {
            this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(offset, Optional.empty(), this.metadata.currentLeader(partition));
            this.subscriptions.seekUnvalidated(partition, newPosition);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        this.acquireAndEnsureOpen();
        try {
            if (offsetAndMetadata.leaderEpoch().isPresent()) {
                this.log.info("Seeking to offset {} for partition {} with epoch {}", new Object[]{offset, partition, offsetAndMetadata.leaderEpoch().get()});
            } else {
                this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            }
            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), currentLeaderAndEpoch);
            this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
            this.subscriptions.seekUnvalidated(partition, newPosition);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        this.acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions;
            this.subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        this.acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.isEmpty() ? this.subscriptions.assignedPartitions() : partitions;
            this.subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
        }
        finally {
            this.release();
        }
    }

    @Override
    public long position(TopicPartition partition) {
        return this.position(partition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public long position(TopicPartition partition, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition)) {
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
            }
            Timer timer = this.time.timer(timeout);
            do {
                SubscriptionState.FetchPosition position;
                if ((position = this.subscriptions.validPosition(partition)) != null) {
                    long l = position.offset;
                    return l;
                }
                this.updateFetchPositions(timer);
                timer.update();
                this.wakeupTrigger.maybeTriggerWakeup();
            } while (timer.notExpired());
            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position for partition " + partition + " could be determined");
        }
        finally {
            this.release();
        }
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition) {
        return this.committed(partition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
        return this.committed(Collections.singleton(partition), timeout).get(partition);
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        return this.committed(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * Loose catch block
     */
    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        this.acquireAndEnsureOpen();
        long start = this.time.nanoseconds();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (partitions.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> map = Collections.emptyMap();
                return map;
            }
            FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent(partitions, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(event.future());
            try {
                Map<TopicPartition, OffsetAndMetadata> committedOffsets = this.applicationEventHandler.addAndGet(event);
                committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
                Map<TopicPartition, OffsetAndMetadata> map = committedOffsets;
                return map;
            }
            catch (TimeoutException e) {
                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last committed offset for partitions " + partitions + " could be determined. Try tuning " + "default.api.timeout.ms" + " larger to relax the threshold.");
            }
            finally {
                this.wakeupTrigger.clearTask();
            }
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            this.kafkaConsumerMetrics.recordCommitted(this.time.nanoseconds() - start);
            this.release();
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (!this.groupMetadata.get().isPresent()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.partitionsFor(topic, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (!parts.isEmpty()) {
                List<PartitionInfo> list = parts;
                return list;
            }
            if (timeout.toMillis() == 0L) {
                throw new TimeoutException();
            }
            TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(topicMetadataEvent.future());
            try {
                Map<String, List<PartitionInfo>> topicMetadata = this.applicationEventHandler.addAndGet(topicMetadataEvent);
                List<PartitionInfo> list = topicMetadata.getOrDefault(topic, Collections.emptyList());
                this.wakeupTrigger.clearTask();
                return list;
            }
            catch (Throwable throwable) {
                this.wakeupTrigger.clearTask();
                throw throwable;
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        return this.listTopics(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            if (timeout.toMillis() == 0L) {
                throw new TimeoutException();
            }
            AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(CompletableEvent.calculateDeadlineMs(this.time, timeout));
            this.wakeupTrigger.setActiveTask(topicMetadataEvent.future());
            try {
                Map<String, List<PartitionInfo>> map = this.applicationEventHandler.addAndGet(topicMetadataEvent);
                this.wakeupTrigger.clearTask();
                return map;
            }
            catch (Throwable throwable) {
                this.wakeupTrigger.clearTask();
                throw throwable;
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<TopicPartition> paused() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Pausing partitions {}", partitions);
            for (TopicPartition partition : partitions) {
                this.subscriptions.pause(partition);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            this.log.debug("Resuming partitions {}", partitions);
            for (TopicPartition partition : partitions) {
                this.subscriptions.resume(partition);
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        return this.offsetsForTimes(timestampsToSearch, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(timestampsToSearch, "Timestamps to search cannot be null");
            for (Map.Entry<TopicPartition, Long> entry2 : timestampsToSearch.entrySet()) {
                if (entry2.getValue() >= 0L) continue;
                throw new IllegalArgumentException("The target time for partition " + entry2.getKey() + " is " + entry2.getValue() + ". The target time cannot be negative.");
            }
            if (timestampsToSearch.isEmpty()) {
                Map map = Collections.emptyMap();
                return map;
            }
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(timestampsToSearch, CompletableEvent.calculateDeadlineMs(this.time, timeout), true);
            if (timeout.toMillis() == 0L) {
                this.applicationEventHandler.add(listOffsetsEvent);
                Map<TopicPartition, OffsetAndTimestamp> map = listOffsetsEvent.emptyResults();
                return map;
            }
            Map<TopicPartition, OffsetAndTimestamp> map = this.applicationEventHandler.addAndGet(listOffsetsEvent).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((OffsetAndTimestampInternal)entry.getValue()).buildOffsetAndTimestamp()));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        return this.beginningOffsets(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.beginningOrEndOffset(partitions, -2L, timeout);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        return this.endOffsets(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        return this.beginningOrEndOffset(partitions, -1L, timeout);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions, long timestamp, Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(partitions, "Partitions cannot be null");
            if (partitions.isEmpty()) {
                Map<TopicPartition, Long> map = Collections.emptyMap();
                return map;
            }
            Map<TopicPartition, Long> timestampToSearch = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> timestamp));
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(timestampToSearch, CompletableEvent.calculateDeadlineMs(this.time, timeout), false);
            if (timeout.isZero()) {
                this.applicationEventHandler.add(listOffsetsEvent);
                Map<TopicPartition, Long> map = listOffsetsEvent.emptyResults();
                return map;
            }
            Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap = this.applicationEventHandler.addAndGet(listOffsetsEvent);
            Map<TopicPartition, Long> map = offsetAndTimestampMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> ((OffsetAndTimestampInternal)entry.getValue()).offset()));
            return map;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public OptionalLong currentLag(TopicPartition topicPartition) {
        this.acquireAndEnsureOpen();
        try {
            Long lag = this.subscriptions.partitionLag(topicPartition, this.isolationLevel);
            if (lag == null) {
                if (this.subscriptions.partitionEndOffset(topicPartition, this.isolationLevel) == null && !this.subscriptions.partitionEndOffsetRequested(topicPartition)) {
                    this.log.info("Requesting the log end offset for {} in order to compute lag", (Object)topicPartition);
                    this.subscriptions.requestPartitionEndOffset(topicPartition);
                    this.endOffsets(Collections.singleton(topicPartition), Duration.ofMillis(0L));
                }
                OptionalLong optionalLong = OptionalLong.empty();
                return optionalLong;
            }
            OptionalLong optionalLong = OptionalLong.of(lag);
            return optionalLong;
        }
        finally {
            this.release();
        }
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            ConsumerGroupMetadata consumerGroupMetadata = this.groupMetadata.get().get();
            return consumerGroupMetadata;
        }
        finally {
            this.release();
        }
    }

    @Override
    public void enforceRebalance() {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override
    public void enforceRebalance(String reason) {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(30000L));
    }

    @Override
    public void close(Duration timeout) {
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.acquire();
        try {
            if (!this.closed) {
                this.close(timeout, false);
            }
        }
        finally {
            this.closed = true;
            this.release();
        }
    }

    private void close(Duration timeout, boolean swallowException) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.wakeupTrigger.disableWakeups();
        Timer closeTimer = this.time.timer(timeout);
        this.clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
        closeTimer.update();
        Utils.swallow(this.log, Level.ERROR, "Failed to release assignment before closing consumer", () -> this.releaseAssignmentAndLeaveGroup(closeTimer), firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> this.awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
        if (this.applicationEventHandler != null) {
            Utils.closeQuietly(() -> this.applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
        }
        closeTimer.update();
        if (this.backgroundEventReaper != null && this.backgroundEventQueue != null) {
            this.backgroundEventReaper.reap(this.backgroundEventQueue);
        }
        Utils.closeQuietly(this.interceptors, "consumer interceptors", firstException);
        Utils.closeQuietly((AutoCloseable)this.kafkaConsumerMetrics, "kafka consumer metrics", firstException);
        Utils.closeQuietly((AutoCloseable)this.metrics, "consumer metrics", firstException);
        Utils.closeQuietly(this.deserializers, "consumer deserializers", firstException);
        this.clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly((AutoCloseable)reporter, "async consumer telemetry reporter", firstException));
        AppInfoParser.unregisterAppInfo("kafka.consumer", this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka consumer", exception);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseAssignmentAndLeaveGroup(Timer timer) {
        if (!this.groupMetadata.get().isPresent()) {
            return;
        }
        if (this.autoCommitEnabled) {
            this.commitSyncAllConsumed(timer);
        }
        this.applicationEventHandler.add(new CommitOnCloseEvent());
        this.log.info("Releasing assignment and leaving group before closing consumer");
        UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
        this.applicationEventHandler.add(unsubscribeEvent);
        try {
            this.processBackgroundEvents(unsubscribeEvent.future(), timer);
            this.log.info("Completed releasing assignment and sending leave group to close consumer");
        }
        catch (TimeoutException e) {
            this.log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't complete it within {} ms. It will proceed to close.", (Object)timer.timeoutMs());
        }
        finally {
            timer.update();
        }
    }

    void commitSyncAllConsumed(Timer timer) {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = this.subscriptions.allConsumed();
        this.log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
        try {
            this.commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
        }
        catch (Exception e) {
            this.log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, (Object)e.getMessage());
        }
        timer.update();
    }

    @Override
    public void wakeup() {
        this.wakeupTrigger.wakeup();
    }

    @Override
    public void commitSync(Duration timeout) {
        this.commitSync(this.subscriptions.allConsumed(), timeout);
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitSync(offsets, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        this.acquireAndEnsureOpen();
        long commitStart = this.time.nanoseconds();
        try {
            SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, CompletableEvent.calculateDeadlineMs(this.time, timeout));
            CompletableFuture<Void> commitFuture = this.commit(syncCommitEvent);
            Timer requestTimer = this.time.timer(timeout.toMillis());
            this.awaitPendingAsyncCommitsAndExecuteCommitCallbacks(requestTimer, true);
            this.wakeupTrigger.setActiveTask(commitFuture);
            ConsumerUtils.getResult(commitFuture, requestTimer);
            this.interceptors.onCommit(offsets);
        }
        finally {
            this.wakeupTrigger.clearTask();
            this.kafkaConsumerMetrics.recordCommitSync(this.time.nanoseconds() - commitStart);
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) {
        if (this.lastPendingAsyncCommit == null) {
            return;
        }
        try {
            CompletableFuture futureToAwait = new CompletableFuture();
            this.lastPendingAsyncCommit.whenComplete((v, t2) -> futureToAwait.complete(null));
            if (enableWakeup) {
                this.wakeupTrigger.setActiveTask(futureToAwait);
            }
            ConsumerUtils.getResult(futureToAwait, timer);
            this.lastPendingAsyncCommit = null;
        }
        finally {
            if (enableWakeup) {
                this.wakeupTrigger.clearTask();
            }
            timer.update();
        }
        this.offsetCommitCallbackInvoker.executeCallbacks();
    }

    @Override
    public Uuid clientInstanceId(Duration timeout) {
        if (!this.clientTelemetryReporter.isPresent()) {
            throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.");
        }
        return ClientTelemetryUtils.fetchClientInstanceId(this.clientTelemetryReporter.get(), timeout);
    }

    @Override
    public Set<TopicPartition> assignment() {
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<String> subscription() {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = Collections.unmodifiableSet(this.subscriptions.subscription());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
        this.acquireAndEnsureOpen();
        try {
            if (partitions == null) {
                throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
            }
            if (partitions.isEmpty()) {
                this.unsubscribe();
                return;
            }
            for (TopicPartition tp : partitions) {
                String topic = tp != null ? tp.topic() : null;
                if (!Utils.isBlank(topic)) continue;
                throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
            }
            HashSet<TopicPartition> currentTopicPartitions = new HashSet<TopicPartition>();
            for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
                if (!partitions.contains(tp)) continue;
                currentTopicPartitions.add(tp);
            }
            this.fetchBuffer.retainAll(currentTopicPartitions);
            this.applicationEventHandler.add(new AssignmentChangeEvent(this.subscriptions.allConsumed(), this.time.milliseconds()));
            this.log.info("Assigned to partition(s): {}", (Object)partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")));
            if (this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions))) {
                this.applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
            }
        }
        finally {
            this.release();
        }
    }

    private void updatePatternSubscription(Cluster cluster) {
        Set<String> topicsToSubscribe = cluster.topics().stream().filter(this.subscriptions::matchesSubscribedPattern).collect(Collectors.toSet());
        if (this.subscriptions.subscribeFromPattern(topicsToSubscribe)) {
            this.applicationEventHandler.add(new SubscriptionChangeEvent());
            this.metadataVersionSnapshot = this.metadata.requestUpdateForNewTopics();
        }
    }

    @Override
    public void unsubscribe() {
        this.acquireAndEnsureOpen();
        try {
            this.fetchBuffer.retainAll(Collections.emptySet());
            Timer timer = this.time.timer(Long.MAX_VALUE);
            UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
            this.applicationEventHandler.add(unsubscribeEvent);
            this.log.info("Unsubscribing all topics or patterns and assigned partitions {}", this.subscriptions.assignedPartitions());
            try {
                this.processBackgroundEvents(unsubscribeEvent.future(), timer);
                this.log.info("Unsubscribed all topics or patterns and assigned partitions");
            }
            catch (TimeoutException e) {
                this.log.error("Failed while waiting for the unsubscribe event to complete");
            }
            this.resetGroupMetadata();
        }
        catch (Exception e) {
            this.log.error("Unsubscribe failed", (Throwable)e);
            throw e;
        }
        finally {
            this.release();
        }
    }

    private void resetGroupMetadata() {
        this.groupMetadata.updateAndGet(oldGroupMetadataOptional -> oldGroupMetadataOptional.map(oldGroupMetadata -> this.initializeConsumerGroupMetadata(oldGroupMetadata.groupId(), oldGroupMetadata.groupInstanceId())));
    }

    @Override
    @Deprecated
    public ConsumerRecords<K, V> poll(long timeoutMs) {
        throw new UnsupportedOperationException("Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". This method is deprecated and will be removed in the next major release.");
    }

    WakeupTrigger wakeupTrigger() {
        return this.wakeupTrigger;
    }

    private Fetch<K, V> pollForFetches(Timer timer) {
        long pollTimeout = this.isCommittedOffsetsManagementEnabled() ? Math.min(this.applicationEventHandler.maximumTimeToWait(), timer.remainingMs()) : timer.remainingMs();
        Fetch<K, V> fetch = this.collectFetch();
        if (!fetch.isEmpty()) {
            return fetch;
        }
        if (!this.cachedSubscriptionHasAllFetchPositions && pollTimeout > this.retryBackoffMs) {
            pollTimeout = this.retryBackoffMs;
        }
        this.log.trace("Polling for fetches with timeout {}", (Object)pollTimeout);
        Timer pollTimer = this.time.timer(pollTimeout);
        this.wakeupTrigger.setFetchAction(this.fetchBuffer);
        try {
            this.fetchBuffer.awaitNotEmpty(pollTimer);
        }
        catch (InterruptException e) {
            this.log.trace("Interrupt during fetch", (Throwable)e);
            throw e;
        }
        finally {
            timer.update(pollTimer.currentTimeMs());
            this.wakeupTrigger.clearTask();
        }
        return this.collectFetch();
    }

    private Fetch<K, V> collectFetch() {
        Fetch<K, V> fetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        this.applicationEventHandler.wakeupNetworkThread();
        return fetch;
    }

    private boolean updateFetchPositions(Timer timer) {
        try {
            this.applicationEventHandler.addAndGet(new ValidatePositionsEvent(CompletableEvent.calculateDeadlineMs(timer)));
            this.cachedSubscriptionHasAllFetchPositions = this.subscriptions.hasAllFetchPositions();
            if (this.cachedSubscriptionHasAllFetchPositions) {
                return true;
            }
            if (this.isCommittedOffsetsManagementEnabled() && !this.initWithCommittedOffsetsIfNeeded(timer)) {
                return false;
            }
            this.subscriptions.resetInitializingPositions();
            this.applicationEventHandler.addAndGet(new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs(timer)));
            return true;
        }
        catch (TimeoutException e) {
            return false;
        }
    }

    private boolean isCommittedOffsetsManagementEnabled() {
        return this.groupMetadata.get().isPresent();
    }

    private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
        Set<TopicPartition> initializingPartitions = this.subscriptions.initializingPartitions();
        if (initializingPartitions.isEmpty()) {
            return true;
        }
        this.log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);
        if (!this.canReusePendingOffsetFetchEvent(initializingPartitions)) {
            long timeoutMs = Math.max((long)this.defaultApiTimeoutMs, timer.remainingMs());
            long deadlineMs = CompletableEvent.calculateDeadlineMs(this.time, timeoutMs);
            this.pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, deadlineMs);
            this.applicationEventHandler.add(this.pendingOffsetFetchEvent);
        }
        CompletableFuture future = this.pendingOffsetFetchEvent.future();
        try {
            this.wakeupTrigger.setActiveTask(future);
            Map offsets = (Map)ConsumerUtils.getResult(future, timer);
            this.pendingOffsetFetchEvent = null;
            ConsumerUtils.refreshCommittedOffsets(offsets, this.metadata, this.subscriptions);
            boolean bl = true;
            return bl;
        }
        catch (TimeoutException e) {
            this.log.debug("The committed offsets for the following partition(s) could not be refreshed within the timeout: {} ", initializingPartitions);
            boolean bl = false;
            return bl;
        }
        catch (InterruptException e) {
            throw e;
        }
        catch (Throwable t2) {
            this.pendingOffsetFetchEvent = null;
            throw ConsumerUtils.maybeWrapAsKafkaException(t2);
        }
        finally {
            this.wakeupTrigger.clearTask();
        }
    }

    private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions) {
        if (this.pendingOffsetFetchEvent == null) {
            return false;
        }
        if (!this.pendingOffsetFetchEvent.partitions().equals(partitions)) {
            return false;
        }
        return this.pendingOffsetFetchEvent.deadlineMs() > this.time.milliseconds();
    }

    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        if (offsetAndMetadata != null) {
            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(topicPartition, (int)epoch));
        }
    }

    @Override
    public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
        this.maybeThrowFencedInstanceException();
        this.offsetCommitCallbackInvoker.executeCallbacks();
        this.maybeUpdateSubscriptionMetadata();
        this.processBackgroundEvents();
        return this.updateFetchPositions(timer);
    }

    @Override
    public void subscribe(Collection<String> topics) {
        this.subscribeInternal(topics, Optional.empty());
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribeInternal(topics, Optional.of(listener));
    }

    @Override
    public void subscribe(Pattern pattern) {
        this.subscribeInternal(pattern, Optional.empty());
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        this.subscribeInternal(pattern, Optional.of(listener));
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        Thread thread = Thread.currentThread();
        long threadId = thread.getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. currentThread(name: " + thread.getName() + ", id: " + threadId + ") otherThread(id: " + this.currentThread.get() + ")");
        }
        this.refCount.incrementAndGet();
    }

    private void release() {
        if (this.refCount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (pattern == null || pattern.toString().isEmpty()) {
                throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty"));
            }
            this.log.info("Subscribed to pattern: '{}'", (Object)pattern);
            this.subscriptions.subscribe(pattern, listener);
            this.metadata.requestUpdateForNewTopics();
            this.updatePatternSubscription(this.metadata.fetch());
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (topics.isEmpty()) {
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (!Utils.isBlank(topic)) continue;
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                HashSet<TopicPartition> currentTopicPartitions = new HashSet<TopicPartition>();
                for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
                    if (!topics.contains(tp.topic())) continue;
                    currentTopicPartitions.add(tp);
                }
                this.fetchBuffer.retainAll(currentTopicPartitions);
                this.log.info("Subscribed to topic(s): {}", (Object)String.join((CharSequence)", ", topics));
                if (this.subscriptions.subscribe(new HashSet<String>(topics), listener)) {
                    this.metadataVersionSnapshot = this.metadata.requestUpdateForNewTopics();
                }
                this.applicationEventHandler.add(new SubscriptionChangeEvent());
            }
        }
        finally {
            this.release();
        }
    }

    private boolean processBackgroundEvents() {
        AtomicReference<KafkaException> firstError = new AtomicReference<KafkaException>();
        LinkedList events = new LinkedList();
        this.backgroundEventQueue.drainTo(events);
        for (BackgroundEvent event : events) {
            try {
                if (event instanceof CompletableEvent) {
                    this.backgroundEventReaper.add((CompletableEvent)((Object)event));
                }
                this.backgroundEventProcessor.process(event);
            }
            catch (Throwable t2) {
                KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t2);
                if (firstError.compareAndSet(null, e)) continue;
                this.log.warn("An error occurred when processing the background event: {}", (Object)e.getMessage(), (Object)e);
            }
        }
        this.backgroundEventReaper.reap(this.time.milliseconds());
        if (firstError.get() != null) {
            throw (KafkaException)firstError.get();
        }
        return !events.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <T> T processBackgroundEvents(Future<T> future, Timer timer) {
        do {
            boolean hadEvents = this.processBackgroundEvents();
            try {
                if (future.isDone()) {
                    T t2 = ConsumerUtils.getResult(future);
                    return t2;
                }
                if (!hadEvents) {
                    Timer pollInterval = this.time.timer(100L);
                    T t3 = ConsumerUtils.getResult(future, pollInterval);
                    return t3;
                }
            }
            catch (TimeoutException timeoutException) {
            }
            finally {
                timer.update();
            }
        } while (timer.notExpired());
        throw new TimeoutException("Operation timed out before completion");
    }

    static ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks(ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, ConsumerRebalanceListenerMethodName methodName, SortedSet<TopicPartition> partitions, CompletableFuture<Void> future) {
        Exception e;
        switch (methodName) {
            case ON_PARTITIONS_REVOKED: {
                e = rebalanceListenerInvoker.invokePartitionsRevoked(partitions);
                break;
            }
            case ON_PARTITIONS_ASSIGNED: {
                e = rebalanceListenerInvoker.invokePartitionsAssigned(partitions);
                break;
            }
            case ON_PARTITIONS_LOST: {
                e = rebalanceListenerInvoker.invokePartitionsLost(partitions);
                break;
            }
            default: {
                throw new IllegalArgumentException("The method " + methodName.fullyQualifiedMethodName() + " to invoke was not expected");
            }
        }
        Optional<KafkaException> error = e != null ? Optional.of(ConsumerUtils.maybeWrapAsKafkaException(e, "User rebalance callback throws an error")) : Optional.empty();
        return new ConsumerRebalanceListenerCallbackCompletedEvent(methodName, future, error);
    }

    @Override
    public String clientId() {
        return this.clientId;
    }

    @Override
    public Metrics metricsRegistry() {
        return this.metrics;
    }

    @Override
    public KafkaConsumerMetrics kafkaConsumerMetrics() {
        return this.kafkaConsumerMetrics;
    }

    private void maybeThrowFencedInstanceException() {
        if (this.asyncCommitFenced.get()) {
            String groupInstanceId = "unknown";
            if (!this.groupMetadata.get().isPresent()) {
                this.log.error("No group metadata found although a group ID was provided. This is a bug!");
            } else if (!this.groupMetadata.get().get().groupInstanceId().isPresent()) {
                this.log.error("No group instance ID found although the consumer is fenced. This is a bug!");
            } else {
                groupInstanceId = this.groupMetadata.get().get().groupInstanceId().get();
            }
            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId);
        }
    }

    SubscriptionState subscriptions() {
        return this.subscriptions;
    }

    boolean hasPendingOffsetFetchEvent() {
        return this.pendingOffsetFetchEvent != null;
    }

    private void maybeUpdateSubscriptionMetadata() {
        if (this.metadataVersionSnapshot < this.metadata.updateVersion()) {
            this.metadataVersionSnapshot = this.metadata.updateVersion();
            if (this.subscriptions.hasPatternSubscription()) {
                this.updatePatternSubscription(this.metadata.fetch());
            }
        }
    }

    static interface ConsumerMetadataFactory {
        public ConsumerMetadata build(ConsumerConfig var1, SubscriptionState var2, LogContext var3, ClusterResourceListeners var4);
    }

    static interface FetchCollectorFactory<K, V> {
        public FetchCollector<K, V> build(LogContext var1, ConsumerMetadata var2, SubscriptionState var3, FetchConfig var4, Deserializers<K, V> var5, FetchMetricsManager var6, Time var7);
    }

    static interface CompletableEventReaperFactory {
        public CompletableEventReaper build(LogContext var1);
    }

    static interface ApplicationEventHandlerFactory {
        public ApplicationEventHandler build(LogContext var1, Time var2, BlockingQueue<ApplicationEvent> var3, CompletableEventReaper var4, Supplier<ApplicationEventProcessor> var5, Supplier<NetworkClientDelegate> var6, Supplier<RequestManagers> var7);
    }

    private class BackgroundEventProcessor
    implements EventProcessor<BackgroundEvent> {
        private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;

        public BackgroundEventProcessor(ConsumerRebalanceListenerInvoker rebalanceListenerInvoker) {
            this.rebalanceListenerInvoker = rebalanceListenerInvoker;
        }

        @Override
        public void process(BackgroundEvent event) {
            switch (event.type()) {
                case ERROR: {
                    this.process((ErrorEvent)event);
                    break;
                }
                case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED: {
                    this.process((ConsumerRebalanceListenerCallbackNeededEvent)event);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Background event type " + (Object)((Object)event.type()) + " was not expected");
                }
            }
        }

        @Override
        private void process(ErrorEvent event) {
            throw event.error();
        }

        @Override
        private void process(ConsumerRebalanceListenerCallbackNeededEvent event) {
            ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = AsyncKafkaConsumer.invokeRebalanceCallbacks(this.rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future());
            AsyncKafkaConsumer.this.applicationEventHandler.add(invokedEvent);
            if (invokedEvent.error().isPresent()) {
                throw invokedEvent.error().get();
            }
        }
    }
}

