package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.class */
public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
    private static final long NO_CURRENT_THREAD = -1;
    private final ApplicationEventHandler applicationEventHandler;
    private final Time time;
    private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics;
    private final AsyncConsumerMetrics asyncConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final String groupId;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final ShareConsumerImpl<K, V>.BackgroundEventProcessor backgroundEventProcessor;
    private final CompletableEventReaper backgroundEventReaper;
    private final Deserializers<K, V> deserializers;
    private ShareFetch<K, V> currentFetch;
    private AcknowledgementCommitCallbackHandler acknowledgementCommitCallbackHandler;
    private final List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements;
    private AcknowledgementMode acknowledgementMode;
    private final ShareFetchBuffer fetchBuffer;
    private final ShareFetchCollector<K, V> fetchCollector;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private final Metrics metrics;
    private final int defaultApiTimeoutMs;
    private volatile boolean closed;
    private Optional<ClientTelemetryReporter> clientTelemetryReporter;
    private final WakeupTrigger wakeupTrigger;
    private final AtomicLong currentThread;
    private final AtomicInteger refCount;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImpl$AcknowledgementMode.class */
    public enum AcknowledgementMode {
        UNKNOWN,
        PENDING,
        EXPLICIT,
        IMPLICIT
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImpl$ApplicationEventHandlerFactory.class */
    interface ApplicationEventHandlerFactory {
        ApplicationEventHandler build(LogContext logContext, Time time, BlockingQueue<ApplicationEvent> blockingQueue, CompletableEventReaper completableEventReaper, Supplier<ApplicationEventProcessor> supplier, Supplier<NetworkClientDelegate> supplier2, Supplier<RequestManagers> supplier3, AsyncConsumerMetrics asyncConsumerMetrics);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImpl$BackgroundEventProcessor.class */
    public class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> {
        public BackgroundEventProcessor() {
        }

        @Override // org.apache.kafka.clients.consumer.internals.events.EventProcessor
        public void process(BackgroundEvent backgroundEvent) {
            switch (backgroundEvent.type()) {
                case ERROR:
                    process((ErrorEvent) backgroundEvent);
                    return;
                case SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK:
                    process((ShareAcknowledgementCommitCallbackEvent) backgroundEvent);
                    return;
                default:
                    throw new IllegalArgumentException("Background event type " + String.valueOf(backgroundEvent.type()) + " was not expected");
            }
        }

        private void process(ErrorEvent errorEvent) {
            throw errorEvent.error();
        }

        private void process(ShareAcknowledgementCommitCallbackEvent shareAcknowledgementCommitCallbackEvent) {
            if (ShareConsumerImpl.this.acknowledgementCommitCallbackHandler != null) {
                ShareConsumerImpl.this.completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareConsumerImpl$ShareFetchCollectorFactory.class */
    interface ShareFetchCollectorFactory<K, V> {
        ShareFetchCollector<K, V> build(LogContext logContext, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig fetchConfig, Deserializers<K, V> deserializers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShareConsumerImpl(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(consumerConfig, deserializer, deserializer2, Time.SYSTEM, ApplicationEventHandler::new, CompletableEventReaper::new, ShareFetchCollector::new, new LinkedBlockingQueue());
    }

    /* JADX WARN: Finally extract failed */
    ShareConsumerImpl(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2, Time time, ApplicationEventHandlerFactory applicationEventHandlerFactory, AsyncKafkaConsumer.CompletableEventReaperFactory completableEventReaperFactory, ShareFetchCollectorFactory<K, V> shareFetchCollectorFactory, LinkedBlockingQueue<BackgroundEvent> linkedBlockingQueue) {
        this.acknowledgementMode = AcknowledgementMode.UNKNOWN;
        this.closed = false;
        this.clientTelemetryReporter = Optional.empty();
        this.wakeupTrigger = new WakeupTrigger();
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.SHARE);
            this.clientId = consumerConfig.getString("client.id");
            this.groupId = consumerConfig.getString("group.id");
            maybeThrowInvalidGroupIdException();
            LogContext createLogContext = createLogContext(this.clientId, this.groupId);
            this.backgroundEventQueue = linkedBlockingQueue;
            this.log = createLogContext.logger(getClass());
            this.log.debug("Initializing the Kafka share consumer");
            this.defaultApiTimeoutMs = consumerConfig.getInt("default.api.timeout.ms").intValue();
            this.time = time;
            List<MetricsReporter> metricsReporters = CommonClientConfigs.metricsReporters(this.clientId, consumerConfig);
            this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(this.clientId, consumerConfig);
            Optional<ClientTelemetryReporter> optional = this.clientTelemetryReporter;
            Objects.requireNonNull(metricsReporters);
            optional.ifPresent((v1) -> {
                r1.add(v1);
            });
            this.metrics = ConsumerUtils.createMetrics(consumerConfig, time, metricsReporters);
            this.asyncConsumerMetrics = new AsyncConsumerMetrics(this.metrics);
            this.deserializers = new Deserializers<>(consumerConfig, deserializer, deserializer2);
            this.currentFetch = ShareFetch.empty();
            this.subscriptions = ConsumerUtils.createSubscriptionState(consumerConfig, createLogContext);
            this.metadata = new ConsumerMetadata(consumerConfig, this.subscriptions, createLogContext, ClientUtils.configureClusterResourceListeners(this.metrics.reporters(), Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer)));
            this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(consumerConfig));
            ShareFetchMetricsManager createShareFetchMetricsManager = ConsumerUtils.createShareFetchMetricsManager(this.metrics);
            ApiVersions apiVersions = new ApiVersions();
            LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
            BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(linkedBlockingQueue, time, this.asyncConsumerMetrics);
            this.fetchBuffer = new ShareFetchBuffer(createLogContext);
            Supplier<NetworkClientDelegate> supplier = NetworkClientDelegate.supplier(time, createLogContext, this.metadata, consumerConfig, apiVersions, this.metrics, createShareFetchMetricsManager.throttleTimeSensor(), (ClientTelemetrySender) this.clientTelemetryReporter.map((v0) -> {
                return v0.telemetrySender();
            }).orElse(null), backgroundEventHandler, true, this.asyncConsumerMetrics);
            this.completedAcknowledgements = new LinkedList();
            Supplier<RequestManagers> supplier2 = RequestManagers.supplier(time, createLogContext, backgroundEventHandler, this.metadata, this.subscriptions, this.fetchBuffer, consumerConfig, groupRebalanceConfig, createShareFetchMetricsManager, this.clientTelemetryReporter, this.metrics);
            this.applicationEventHandler = applicationEventHandlerFactory.build(createLogContext, time, linkedBlockingQueue2, new CompletableEventReaper(createLogContext), ApplicationEventProcessor.supplier(createLogContext, this.metadata, this.subscriptions, supplier2), supplier, supplier2, this.asyncConsumerMetrics);
            this.backgroundEventProcessor = new BackgroundEventProcessor();
            this.backgroundEventReaper = completableEventReaperFactory.build(createLogContext);
            this.fetchCollector = shareFetchCollectorFactory.build(createLogContext, this.metadata, this.subscriptions, new FetchConfig(consumerConfig), this.deserializers);
            this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(this.metrics, ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX);
            consumerConfig.logUnused();
            AppInfoParser.registerAppInfo(ConsumerUtils.CONSUMER_JMX_PREFIX, this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka share consumer initialized");
        } catch (Throwable th) {
            if (this.log != null) {
                close(Duration.ZERO, true);
            }
            throw new KafkaException("Failed to construct Kafka share consumer", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShareConsumerImpl(LogContext logContext, String str, String str2, ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2, Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata) {
        this.acknowledgementMode = AcknowledgementMode.UNKNOWN;
        this.closed = false;
        this.clientTelemetryReporter = Optional.empty();
        this.wakeupTrigger = new WakeupTrigger();
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        this.clientId = str;
        this.groupId = str2;
        this.log = logContext.logger(getClass());
        this.time = time;
        this.metrics = new Metrics(time);
        this.clientTelemetryReporter = Optional.empty();
        this.deserializers = new Deserializers<>(consumerConfig, deserializer, deserializer2);
        this.currentFetch = ShareFetch.empty();
        this.subscriptions = subscriptionState;
        this.metadata = consumerMetadata;
        this.defaultApiTimeoutMs = consumerConfig.getInt("default.api.timeout.ms").intValue();
        this.fetchBuffer = new ShareFetchBuffer(logContext);
        this.completedAcknowledgements = new LinkedList();
        ShareFetchMetricsManager shareFetchMetricsManager = new ShareFetchMetricsManager(this.metrics, new ShareConsumerMetrics(ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX).shareFetchMetrics);
        this.fetchCollector = new ShareFetchCollector<>(logContext, consumerMetadata, subscriptionState, new FetchConfig(consumerConfig), this.deserializers);
        this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(this.metrics, ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX);
        this.asyncConsumerMetrics = new AsyncConsumerMetrics(this.metrics);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(new LinkedBlockingQueue(), time, this.asyncConsumerMetrics);
        Supplier supplier = () -> {
            return new NetworkClientDelegate(time, consumerConfig, logContext, kafkaClient, consumerMetadata, backgroundEventHandler, true, this.asyncConsumerMetrics);
        };
        Supplier<RequestManagers> supplier2 = RequestManagers.supplier(time, logContext, backgroundEventHandler, consumerMetadata, subscriptionState, this.fetchBuffer, consumerConfig, new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.SHARE), shareFetchMetricsManager, this.clientTelemetryReporter, this.metrics);
        this.applicationEventHandler = new ApplicationEventHandler(logContext, time, linkedBlockingQueue, new CompletableEventReaper(logContext), ApplicationEventProcessor.supplier(logContext, consumerMetadata, subscriptionState, supplier2), supplier, supplier2, this.asyncConsumerMetrics);
        this.backgroundEventQueue = new LinkedBlockingQueue();
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = new CompletableEventReaper(logContext);
        consumerConfig.logUnused();
        AppInfoParser.registerAppInfo(ConsumerUtils.CONSUMER_JMX_PREFIX, str, this.metrics, time.milliseconds());
    }

    ShareConsumerImpl(LogContext logContext, String str, Deserializer<K> deserializer, Deserializer<V> deserializer2, ShareFetchBuffer shareFetchBuffer, ShareFetchCollector<K, V> shareFetchCollector, Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue<BackgroundEvent> blockingQueue, CompletableEventReaper completableEventReaper, Metrics metrics, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, int i, String str2) {
        this.acknowledgementMode = AcknowledgementMode.UNKNOWN;
        this.closed = false;
        this.clientTelemetryReporter = Optional.empty();
        this.wakeupTrigger = new WakeupTrigger();
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        this.log = logContext.logger(getClass());
        this.subscriptions = subscriptionState;
        this.clientId = str;
        this.groupId = str2;
        this.fetchBuffer = shareFetchBuffer;
        this.fetchCollector = shareFetchCollector;
        this.time = time;
        this.backgroundEventQueue = blockingQueue;
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = completableEventReaper;
        this.metrics = metrics;
        this.metadata = consumerMetadata;
        this.defaultApiTimeoutMs = i;
        this.deserializers = new Deserializers<>(deserializer, deserializer2);
        this.currentFetch = ShareFetch.empty();
        this.applicationEventHandler = applicationEventHandler;
        this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX);
        this.clientTelemetryReporter = Optional.empty();
        this.completedAcknowledgements = Collections.emptyList();
        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public Set<String> subscription() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.subscription());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void subscribe(Collection<String> collection) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (collection == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (collection.isEmpty()) {
                unsubscribe();
            } else {
                Iterator<String> it = collection.iterator();
                while (it.hasNext()) {
                    if (org.apache.kafka.common.utils.Utils.isBlank(it.next())) {
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                    }
                }
                this.applicationEventHandler.addAndGet(new ShareSubscriptionChangeEvent(collection));
                this.log.info("Subscribed to topics: {}", String.join(", ", collection));
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void unsubscribe() {
        acquireAndEnsureOpen();
        try {
            this.applicationEventHandler.addAndGet(new ShareUnsubscribeEvent(CompletableEvent.calculateDeadlineMs(this.time.timer(this.defaultApiTimeoutMs))));
            this.log.info("Unsubscribed all topics");
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public synchronized ConsumerRecords<K, V> poll(Duration duration) {
        Timer timer = this.time.timer(duration);
        acquireAndEnsureOpen();
        try {
            handleCompletedAcknowledgements();
            acknowledgeBatchIfImplicitAcknowledgement(true);
            this.kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics.");
            }
            do {
                this.applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
                processBackgroundEvents();
                this.wakeupTrigger.maybeTriggerWakeup();
                ShareFetch<K, V> pollForFetches = pollForFetches(timer);
                if (!pollForFetches.isEmpty()) {
                    this.currentFetch = pollForFetches;
                    ConsumerRecords<K, V> consumerRecords = new ConsumerRecords<>(pollForFetches.records(), Map.of());
                    this.kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
                    release();
                    return consumerRecords;
                }
                this.metadata.maybeThrowAnyException();
            } while (timer.notExpired());
            ConsumerRecords<K, V> empty = ConsumerRecords.empty();
            this.kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            release();
            return empty;
        } catch (Throwable th) {
            this.kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            release();
            throw th;
        }
    }

    private ShareFetch<K, V> pollForFetches(Timer timer) {
        long min = Math.min(this.applicationEventHandler.maximumTimeToWait(), timer.remainingMs());
        ShareFetch<K, V> collect = collect(this.currentFetch.takeAcknowledgedRecords());
        if (!collect.isEmpty()) {
            return collect;
        }
        Timer timer2 = this.time.timer(min);
        this.wakeupTrigger.setShareFetchAction(this.fetchBuffer);
        try {
            try {
                this.fetchBuffer.awaitNotEmpty(timer2);
                timer.update(timer2.currentTimeMs());
                this.wakeupTrigger.clearTask();
                return collect(Collections.emptyMap());
            } catch (InterruptException e) {
                this.log.trace("Timeout during fetch", e);
                throw e;
            }
        } catch (Throwable th) {
            timer.update(timer2.currentTimeMs());
            this.wakeupTrigger.clearTask();
            throw th;
        }
    }

    private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> map) {
        if (!this.currentFetch.isEmpty()) {
            if (!map.isEmpty()) {
                this.applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(map));
                this.applicationEventHandler.wakeupNetworkThread();
            }
            return this.currentFetch;
        }
        ShareFetch<K, V> collect = this.fetchCollector.collect(this.fetchBuffer);
        if (collect.isEmpty()) {
            this.applicationEventHandler.add(new ShareFetchEvent(map));
            this.applicationEventHandler.wakeupNetworkThread();
        } else if (!map.isEmpty()) {
            this.applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(map));
            this.applicationEventHandler.wakeupNetworkThread();
        }
        return collect;
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void acknowledge(ConsumerRecord<K, V> consumerRecord) {
        acknowledge(consumerRecord, AcknowledgeType.ACCEPT);
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void acknowledge(ConsumerRecord<K, V> consumerRecord, AcknowledgeType acknowledgeType) {
        acquireAndEnsureOpen();
        try {
            ensureExplicitAcknowledgement();
            this.currentFetch.acknowledge(consumerRecord, acknowledgeType);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
        return commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration duration) {
        acquireAndEnsureOpen();
        try {
            handleCompletedAcknowledgements();
            acknowledgeBatchIfImplicitAcknowledgement(false);
            Timer timer = this.time.timer(duration.toMillis());
            Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend = acknowledgementsToSend();
            if (acknowledgementsToSend.isEmpty()) {
                Map<TopicIdPartition, Optional<KafkaException>> emptyMap = Collections.emptyMap();
                release();
                return emptyMap;
            }
            ShareAcknowledgeSyncEvent shareAcknowledgeSyncEvent = new ShareAcknowledgeSyncEvent(acknowledgementsToSend, CompletableEvent.calculateDeadlineMs(timer));
            this.applicationEventHandler.add(shareAcknowledgeSyncEvent);
            CompletableFuture<Map<TopicIdPartition, Acknowledgements>> future = shareAcknowledgeSyncEvent.future();
            this.wakeupTrigger.setActiveTask(future);
            try {
                HashMap hashMap = new HashMap();
                ((Map) ConsumerUtils.getResult(future)).forEach((topicIdPartition, acknowledgements) -> {
                    KafkaException acknowledgeException = acknowledgements.getAcknowledgeException();
                    if (acknowledgeException == null) {
                        hashMap.put(topicIdPartition, Optional.empty());
                    } else {
                        hashMap.put(topicIdPartition, Optional.of(acknowledgeException));
                    }
                });
                this.wakeupTrigger.clearTask();
                release();
                return hashMap;
            } catch (Throwable th) {
                this.wakeupTrigger.clearTask();
                throw th;
            }
        } catch (Throwable th2) {
            release();
            throw th2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void commitAsync() {
        acquireAndEnsureOpen();
        try {
            handleCompletedAcknowledgements();
            acknowledgeBatchIfImplicitAcknowledgement(false);
            Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend = acknowledgementsToSend();
            if (!acknowledgementsToSend.isEmpty()) {
                this.applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsToSend));
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback acknowledgementCommitCallback) {
        acquireAndEnsureOpen();
        try {
            if (acknowledgementCommitCallback != null) {
                if (this.acknowledgementCommitCallbackHandler == null) {
                    this.applicationEventHandler.add(new ShareAcknowledgementCommitCallbackRegistrationEvent(true));
                }
                this.acknowledgementCommitCallbackHandler = new AcknowledgementCommitCallbackHandler(acknowledgementCommitCallback);
            } else {
                if (this.acknowledgementCommitCallbackHandler != null) {
                    this.applicationEventHandler.add(new ShareAcknowledgementCommitCallbackRegistrationEvent(false));
                }
                this.completedAcknowledgements.clear();
                this.acknowledgementCommitCallbackHandler = null;
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public Uuid clientInstanceId(Duration duration) {
        if (this.clientTelemetryReporter.isEmpty()) {
            throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.");
        }
        return ClientTelemetryUtils.fetchClientInstanceId(this.clientTelemetryReporter.get(), duration);
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void registerMetricForSubscription(KafkaMetric kafkaMetric) {
        if (metrics().containsKey(kafkaMetric.metricName())) {
            this.log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", kafkaMetric.metricName());
        } else {
            this.clientTelemetryReporter.ifPresent(clientTelemetryReporter -> {
                clientTelemetryReporter.metricChange(kafkaMetric);
            });
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void unregisterMetricFromSubscription(KafkaMetric kafkaMetric) {
        if (metrics().containsKey(kafkaMetric.metricName())) {
            this.log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", kafkaMetric.metricName());
        } else {
            this.clientTelemetryReporter.ifPresent(clientTelemetryReporter -> {
                clientTelemetryReporter.metricRemoval(kafkaMetric);
            });
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Duration.ofMillis(30000L));
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void close(Duration duration) {
        if (duration.toMillis() < 0) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        acquire();
        try {
            if (!this.closed) {
                close(duration, false);
            }
        } finally {
            this.closed = true;
            release();
        }
    }

    private void close(Duration duration, boolean z) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference atomicReference = new AtomicReference();
        this.wakeupTrigger.disableWakeups();
        Timer timer = this.time.timer(duration);
        this.clientTelemetryReporter.ifPresent((v0) -> {
            v0.initiateClose();
        });
        timer.update();
        org.apache.kafka.common.utils.Utils.swallow(this.log, Level.ERROR, "Failed to release assignment before closing consumer", () -> {
            sendAcknowledgementsAndLeaveGroup(timer, atomicReference);
        }, atomicReference);
        org.apache.kafka.common.utils.Utils.swallow(this.log, Level.ERROR, "Failed invoking acknowledgement commit callback", this::handleCompletedAcknowledgements, atomicReference);
        if (this.applicationEventHandler != null) {
            org.apache.kafka.common.utils.Utils.closeQuietly(() -> {
                this.applicationEventHandler.close(Duration.ofMillis(timer.remainingMs()));
            }, "Failed shutting down network thread", (AtomicReference<Throwable>) atomicReference);
        }
        timer.update();
        if (this.backgroundEventReaper != null && this.backgroundEventQueue != null) {
            this.backgroundEventReaper.reap(this.backgroundEventQueue);
        }
        org.apache.kafka.common.utils.Utils.closeQuietly(this.kafkaShareConsumerMetrics, "kafka share consumer metrics", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.asyncConsumerMetrics, "kafka async consumer metrics", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.metrics, "consumer metrics", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.deserializers, "consumer deserializers", (AtomicReference<Throwable>) atomicReference);
        this.clientTelemetryReporter.ifPresent(clientTelemetryReporter -> {
            org.apache.kafka.common.utils.Utils.closeQuietly(clientTelemetryReporter, "consumer telemetry reporter", (AtomicReference<Throwable>) atomicReference);
        });
        AppInfoParser.unregisterAppInfo(ConsumerUtils.CONSUMER_JMX_PREFIX, this.clientId, this.metrics);
        this.log.debug("Kafka share consumer has been closed");
        Throwable th = (Throwable) atomicReference.get();
        if (th == null || z) {
            return;
        }
        if (!(th instanceof InterruptException)) {
            throw new KafkaException("Failed to close Kafka share consumer", th);
        }
        throw ((InterruptException) th);
    }

    private void sendAcknowledgementsAndLeaveGroup(Timer timer, AtomicReference<Throwable> atomicReference) {
        completeQuietly(() -> {
            this.applicationEventHandler.addAndGet(new ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(), CompletableEvent.calculateDeadlineMs(timer)));
        }, "Failed to send pending acknowledgements with a timeout(ms)=" + timer.timeoutMs(), atomicReference);
        timer.update();
        ShareUnsubscribeEvent shareUnsubscribeEvent = new ShareUnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
        this.applicationEventHandler.add(shareUnsubscribeEvent);
        try {
            try {
                processBackgroundEvents(shareUnsubscribeEvent.future(), timer);
                this.log.info("Completed releasing assignment and leaving group to close consumer.");
                timer.update();
            } catch (TimeoutException e) {
                this.log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't complete it within {} ms. It will proceed to close.", Long.valueOf(timer.timeoutMs()));
                timer.update();
            }
        } catch (Throwable th) {
            timer.update();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.ShareConsumer
    public void wakeup() {
        this.wakeupTrigger.wakeup();
    }

    private void acquireAndEnsureOpen() {
        acquire();
        if (this.closed) {
            release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        Thread currentThread = Thread.currentThread();
        long id = currentThread.getId();
        if (id != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, id)) {
            String name = currentThread.getName();
            this.currentThread.get();
            ConcurrentModificationException concurrentModificationException = new ConcurrentModificationException("KafkaShareConsumer is not safe for multi-threaded access. currentThread(name: " + name + ", id: " + id + ") otherThread(id: " + concurrentModificationException + ")");
            throw concurrentModificationException;
        }
        if (this.acknowledgementCommitCallbackHandler != null && this.acknowledgementCommitCallbackHandler.hasEnteredCallback()) {
            throw new IllegalStateException("KafkaShareConsumer methods are not accessible from user-defined acknowledgement commit callback.");
        }
        this.refCount.incrementAndGet();
    }

    private void release() {
        if (this.refCount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    public static LogContext createLogContext(String str, String str2) {
        return new LogContext("[ShareConsumer clientId=" + str + ", groupId=" + str2 + "] ");
    }

    private void maybeThrowInvalidGroupIdException() {
        if (this.groupId == null || this.groupId.isEmpty()) {
            throw new InvalidGroupIdException("You must provide a valid group.id in the consumer configuration.");
        }
    }

    private void handleCompletedAcknowledgements() {
        processBackgroundEvents();
        if (this.completedAcknowledgements.isEmpty()) {
            return;
        }
        try {
            if (this.acknowledgementCommitCallbackHandler != null) {
                this.acknowledgementCommitCallbackHandler.onComplete(this.completedAcknowledgements);
            }
        } finally {
            this.completedAcknowledgements.clear();
        }
    }

    private void acknowledgeBatchIfImplicitAcknowledgement(boolean z) {
        if (z) {
            if (this.acknowledgementMode == AcknowledgementMode.UNKNOWN) {
                this.acknowledgementMode = AcknowledgementMode.PENDING;
            } else if (this.acknowledgementMode == AcknowledgementMode.PENDING && !this.currentFetch.isEmpty()) {
                this.acknowledgementMode = AcknowledgementMode.IMPLICIT;
            }
        } else if (this.acknowledgementMode == AcknowledgementMode.PENDING && !this.currentFetch.isEmpty()) {
            this.acknowledgementMode = AcknowledgementMode.IMPLICIT;
        }
        if (this.acknowledgementMode == AcknowledgementMode.IMPLICIT) {
            this.currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT);
        }
    }

    private Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend() {
        return this.currentFetch.takeAcknowledgedRecords();
    }

    private void ensureExplicitAcknowledgement() {
        if (this.acknowledgementMode == AcknowledgementMode.PENDING) {
            this.acknowledgementMode = AcknowledgementMode.EXPLICIT;
        } else {
            if (this.acknowledgementMode == AcknowledgementMode.IMPLICIT) {
                throw new IllegalStateException("Implicit acknowledgement of delivery is being used.");
            }
            if (this.acknowledgementMode == AcknowledgementMode.UNKNOWN) {
                throw new IllegalStateException("Acknowledge called before poll.");
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean processBackgroundEvents() {
        AtomicReference atomicReference = new AtomicReference();
        LinkedList linkedList = new LinkedList();
        this.backgroundEventQueue.drainTo(linkedList);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            BackgroundEvent backgroundEvent = (BackgroundEvent) it.next();
            try {
                if (backgroundEvent instanceof CompletableEvent) {
                    this.backgroundEventReaper.add((CompletableEvent) backgroundEvent);
                }
                this.backgroundEventProcessor.process(backgroundEvent);
            } catch (Throwable th) {
                KafkaException maybeWrapAsKafkaException = ConsumerUtils.maybeWrapAsKafkaException(th);
                if (!atomicReference.compareAndSet(null, maybeWrapAsKafkaException)) {
                    this.log.warn("An error occurred when processing the background event: {}", maybeWrapAsKafkaException.getMessage(), maybeWrapAsKafkaException);
                }
            }
        }
        this.backgroundEventReaper.reap(this.time.milliseconds());
        if (atomicReference.get() != null) {
            throw ((KafkaException) atomicReference.get());
        }
        return !linkedList.isEmpty();
    }

    <T> T processBackgroundEvents(Future<T> future, Timer timer) {
        this.log.trace("Will wait up to {} ms for future {} to complete", Long.valueOf(timer.remainingMs()), future);
        do {
            boolean processBackgroundEvents = processBackgroundEvents();
            try {
            } catch (TimeoutException e) {
                timer.update();
            } catch (Throwable th) {
                timer.update();
                throw th;
            }
            if (future.isDone()) {
                T t = (T) ConsumerUtils.getResult(future);
                this.log.trace("Future {} completed successfully", future);
                timer.update();
                return t;
            }
            if (!processBackgroundEvents) {
                Timer timer2 = this.time.timer(100L);
                this.log.trace("Waiting {} ms for future {} to complete", Long.valueOf(timer2.remainingMs()), future);
                T t2 = (T) ConsumerUtils.getResult(future, timer2);
                this.log.trace("Future {} completed successfully", future);
                timer.update();
                return t2;
            }
            timer.update();
        } while (timer.notExpired());
        this.log.trace("Future {} did not complete within timeout", future);
        throw new TimeoutException("Operation timed out before completion");
    }

    void completeQuietly(Utils.ThrowingRunnable throwingRunnable, String str, AtomicReference<Throwable> atomicReference) {
        try {
            throwingRunnable.run();
        } catch (TimeoutException e) {
            this.log.debug("Timeout expired before the {} operation could complete.", str);
        } catch (Exception e2) {
            atomicReference.compareAndSet(null, e2);
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate
    public String clientId() {
        return this.clientId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate
    public Metrics metricsRegistry() {
        return this.metrics;
    }

    @Override // org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate
    public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() {
        return this.kafkaShareConsumerMetrics;
    }
}
