package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/confluent/parallelconsumer/internal/BrokerPollSystem.class */
public class BrokerPollSystem<K, V> implements OffsetCommitter {
    private final ConsumerManager<K, V> consumerManager;
    private Optional<Future<Boolean>> pollControlThreadFuture;
    private final ParallelEoSStreamProcessor<K, V> pc;
    private Optional<ConsumerOffsetCommitter<K, V>> committer;
    private final WorkManager<K, V> wm;
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private static Duration longPollTimeout = Duration.ofMillis(2000);
    private State state = State.running;
    private volatile boolean paused = false;
    private final RateLimiter pauseLimiter = new RateLimiter(1);

    public BrokerPollSystem(ConsumerManager<K, V> consumerManager, WorkManager<K, V> workManager, ParallelEoSStreamProcessor<K, V> parallelEoSStreamProcessor, ParallelConsumerOptions parallelConsumerOptions) {
        this.committer = Optional.empty();
        this.wm = workManager;
        this.pc = parallelEoSStreamProcessor;
        this.consumerManager = consumerManager;
        switch (parallelConsumerOptions.getCommitMode()) {
            case PERIODIC_CONSUMER_SYNC:
            case PERIODIC_CONSUMER_ASYNCHRONOUS:
                this.committer = Optional.of(new ConsumerOffsetCommitter(consumerManager, workManager, parallelConsumerOptions));
                return;
            default:
                return;
        }
    }

    public void start() {
        this.pollControlThreadFuture = Optional.of(Executors.newSingleThreadExecutor().submit(this::controlLoop));
    }

    public void supervise() {
        if (this.pollControlThreadFuture.isPresent()) {
            Future<Boolean> future = this.pollControlThreadFuture.get();
            if (future.isCancelled() || future.isDone()) {
                try {
                    future.get();
                } catch (Exception e) {
                    throw new InternalRuntimeError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
                }
            }
        }
    }

    private boolean controlLoop() {
        Thread.currentThread().setName("pc-broker-poll");
        this.pc.getMyId().ifPresent(str -> {
            MDC.put(ParallelEoSStreamProcessor.MDC_INSTANCE_ID, str);
        });
        log.trace("Broker poll control loop start");
        this.committer.ifPresent(consumerOffsetCommitter -> {
            consumerOffsetCommitter.claim();
        });
        while (this.state != State.closed) {
            try {
                log.trace("Loop: Broker poller: ({})", this.state);
                if (this.state == State.running) {
                    ConsumerRecords<K, V> pollBrokerForRecords = pollBrokerForRecords();
                    log.debug("Got {} records in poll result", Integer.valueOf(pollBrokerForRecords.count()));
                    if (!pollBrokerForRecords.isEmpty()) {
                        log.trace("Loop: Register work");
                        this.wm.registerWork(pollBrokerForRecords);
                        if (!this.wm.hasWorkInFlight()) {
                            log.trace("Apparently no work is being done, make sure Control is awake to receive messages");
                            this.pc.notifyNewWorkRegistered();
                        }
                    }
                }
                maybeDoCommit();
                switch (this.state) {
                    case draining:
                        doPause();
                        transitionToCloseMaybe();
                        break;
                    case closing:
                        doClose();
                        break;
                }
            } catch (Exception e) {
                log.error("Unknown error", e);
                throw e;
            }
        }
        log.debug("Broker poll thread finished, returning true to future");
        return true;
    }

    private void transitionToCloseMaybe() {
        if (isResponsibleForCommits() && !this.wm.isRecordsAwaitingToBeCommitted()) {
            this.state = State.closing;
            return;
        }
        log.trace("Draining, but work still needs to be committed. Yielding thread to avoid busy wait.");
        try {
            Thread.sleep(1L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void doClose() {
        doPause();
        maybeCloseConsumer();
        this.state = State.closed;
    }

    private void maybeCloseConsumer() {
        if (isResponsibleForCommits()) {
            log.debug("Closing {}, first closing consumer...", getClass().getSimpleName());
            this.consumerManager.close(DrainingCloseable.DEFAULT_TIMEOUT);
            log.debug("Consumer closed.");
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer.isPresent();
    }

    private ConsumerRecords<K, V> pollBrokerForRecords() {
        managePauseOfSubscription();
        log.debug("Subscriptions are paused: {}", Boolean.valueOf(this.paused));
        Duration ofMillis = this.state == State.running ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if subs are paused, or no data available on broker.", Long.valueOf(BackportUtils.toSeconds(ofMillis)));
        return this.consumerManager.poll(ofMillis);
    }

    public void drain() {
        if (this.state != State.draining) {
            log.debug("Signaling poll system to drain, waking up consumer...");
            this.state = State.draining;
            this.consumerManager.wakeup();
        }
    }

    private void doPause() {
        if (this.paused) {
            log.trace("Already paused");
        } else if (this.pauseLimiter.couldPerform()) {
            this.pauseLimiter.performIfNotLimited(() -> {
                this.paused = true;
                log.debug("Pausing subs");
                this.consumerManager.pause(this.consumerManager.assignment());
            });
        } else if (log.isDebugEnabled()) {
            log.debug("Should pause but pause rate limit exceeded {} vs {}. Queued: {}", new Object[]{this.pauseLimiter.getElapsedDuration(), this.pauseLimiter.getRate(), this.wm.getWorkQueuedInMailboxCount()});
        }
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> future = this.pollControlThreadFuture.get();
            boolean z = true;
            while (z) {
                try {
                    z = false;
                    if (!future.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS).booleanValue()) {
                        log.warn("Broker poll control thread not closed cleanly.");
                    }
                } catch (InterruptedException e) {
                    log.debug("Interrupted waiting for broker poller thread to finish", e);
                } catch (ExecutionException | TimeoutException e2) {
                    log.error("Execution or timeout exception waiting for broker poller thread to finish", e2);
                    throw e2;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        log.debug("Poller transitioning to closing, waking up consumer");
        this.state = State.closing;
        this.consumerManager.wakeup();
    }

    private void managePauseOfSubscription() {
        boolean shouldThrottle = shouldThrottle();
        log.trace("Need to throttle: {}", Boolean.valueOf(shouldThrottle));
        if (shouldThrottle) {
            doPause();
        } else {
            resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.paused) {
            log.debug("Resuming consumer, waking up");
            this.consumerManager.resume(this.consumerManager.paused());
            this.consumerManager.wakeup();
            this.paused = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    @Override // io.confluent.parallelconsumer.internal.OffsetCommitter
    public void retrieveOffsetsAndCommit() {
        this.committer.orElseThrow(() -> {
            throw new IllegalStateException("No committer configured");
        }).commit();
    }

    private void maybeDoCommit() {
        this.committer.ifPresent((v0) -> {
            v0.maybeDoCommit();
        });
    }

    public void wakeupIfPaused() {
        if (this.paused) {
            this.consumerManager.wakeup();
        }
    }

    public boolean isPaused() {
        return this.paused;
    }

    public static void setLongPollTimeout(Duration duration) {
        longPollTimeout = duration;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}
