package io.confluent.parallelconsumer.internal;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/ConsumerManager.class */
public class ConsumerManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final Consumer<K, V> consumer;
    private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
    private int erroneousWakups = 0;
    private int correctPollWakeups = 0;
    private int noWakeups = 0;
    private boolean commitRequested;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerRecords<K, V> poll(Duration duration) {
        ConsumerRecords<K, V> consumerRecords;
        Duration duration2 = duration;
        try {
            try {
                if (this.commitRequested) {
                    log.debug("Commit requested, so will not long poll as need to perform the commit");
                    duration2 = Duration.ofMillis(1L);
                    this.commitRequested = false;
                }
                this.pollingBroker.set(true);
                consumerRecords = this.consumer.poll(duration2);
                log.debug("Poll completed normally and returned {}...", Integer.valueOf(consumerRecords.count()));
                this.pollingBroker.set(false);
            } catch (WakeupException e) {
                this.correctPollWakeups++;
                log.debug("Awoken from broker poll");
                log.trace("Wakeup caller is:", e);
                consumerRecords = new ConsumerRecords<>(UniMaps.of());
                this.pollingBroker.set(false);
            }
            return consumerRecords;
        } catch (Throwable th) {
            this.pollingBroker.set(false);
            throw th;
        }
    }

    public void wakeup() {
        if (this.pollingBroker.get()) {
            log.debug("Waking up consumer");
            this.consumer.wakeup();
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        boolean z = true;
        this.noWakeups++;
        while (z) {
            try {
                this.consumer.commitSync(map);
                z = false;
            } catch (WakeupException e) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, e);
                this.erroneousWakups++;
            }
        }
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        boolean z = true;
        this.noWakeups++;
        while (z) {
            try {
                this.consumer.commitAsync(map, offsetCommitCallback);
                z = false;
            } catch (WakeupException e) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, e);
                this.erroneousWakups++;
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.consumer.groupMetadata();
    }

    public void close(Duration duration) {
        this.consumer.close(duration);
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public void pause(Set<TopicPartition> set) {
        this.consumer.pause(set);
    }

    public Set<TopicPartition> paused() {
        return this.consumer.paused();
    }

    public void resume(Set<TopicPartition> set) {
        this.consumer.resume(set);
    }

    public void onCommitRequested() {
        this.commitRequested = true;
    }

    public ConsumerManager(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }
}
