/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverPartition;
import reactor.kafka.receiver.internals.AckMode;
import reactor.kafka.receiver.internals.AtmostOnceOffsets;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.KafkaSchedulers;
import reactor.kafka.receiver.internals.SeekablePartition;

class ConsumerEventLoop<K, V>
implements Sinks.EmitFailureHandler {
    private static final Logger log = LoggerFactory.getLogger((String)ConsumerEventLoop.class.getName());
    final AtomicBoolean isActive = new AtomicBoolean(true);
    final AtmostOnceOffsets atmostOnceOffsets;
    final PollEvent pollEvent;
    final AckMode ackMode;
    final ReceiverOptions<K, V> receiverOptions;
    final Scheduler eventScheduler;
    final CommitEvent commitEvent = new CommitEvent();
    final Predicate<Throwable> isRetriableException;
    private final Disposable periodicCommitDisposable;
    Consumer<K, V> consumer;
    final Sinks.Many<ConsumerRecords<K, V>> sink;
    final AtomicBoolean awaitingTransaction;
    volatile long requested;
    static final AtomicLongFieldUpdater<ConsumerEventLoop> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConsumerEventLoop.class, "requested");

    ConsumerEventLoop(AckMode ackMode, AtmostOnceOffsets atmostOnceOffsets, ReceiverOptions<K, V> receiverOptions, Scheduler eventScheduler, Consumer<K, V> consumer, Predicate<Throwable> isRetriableException, Sinks.Many<ConsumerRecords<K, V>> sink, AtomicBoolean awaitingTransaction) {
        this.ackMode = ackMode;
        this.atmostOnceOffsets = atmostOnceOffsets;
        this.receiverOptions = receiverOptions;
        this.eventScheduler = eventScheduler;
        this.consumer = consumer;
        this.isRetriableException = isRetriableException;
        this.sink = sink;
        this.awaitingTransaction = awaitingTransaction;
        this.pollEvent = new PollEvent();
        eventScheduler.schedule((Runnable)new SubscribeEvent());
        Duration commitInterval = receiverOptions.commitInterval();
        if (!commitInterval.isZero()) {
            switch (ackMode) {
                case AUTO_ACK: 
                case MANUAL_ACK: {
                    this.periodicCommitDisposable = Schedulers.parallel().schedulePeriodically(this.commitEvent::scheduleIfRequired, commitInterval.toMillis(), commitInterval.toMillis(), TimeUnit.MILLISECONDS);
                    break;
                }
                default: {
                    this.periodicCommitDisposable = Disposables.disposed();
                    break;
                }
            }
        } else {
            this.periodicCommitDisposable = Disposables.disposed();
        }
    }

    void onRequest(long toAdd) {
        Operators.addCap(REQUESTED, (Object)this, (long)toAdd);
        this.pollEvent.schedule();
    }

    private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        log.debug("onPartitionsRevoked {}", partitions);
        if (!partitions.isEmpty()) {
            if (this.ackMode != AckMode.ATMOST_ONCE) {
                this.commitEvent.runIfRequired(true);
            }
            for (java.util.function.Consumer<Collection<ReceiverPartition>> onRevoke : this.receiverOptions.revokeListeners()) {
                onRevoke.accept(this.toSeekable(partitions));
            }
        }
    }

    private Collection<ReceiverPartition> toSeekable(Collection<TopicPartition> partitions) {
        ArrayList<ReceiverPartition> seekableList = new ArrayList<ReceiverPartition>(partitions.size());
        for (TopicPartition partition : partitions) {
            seekableList.add(new SeekablePartition(this.consumer, partition));
        }
        return seekableList;
    }

    Mono<Void> stop() {
        return Mono.defer(() -> {
            log.debug("dispose {}", (Object)this.isActive);
            if (!this.isActive.compareAndSet(true, false)) {
                return Mono.empty();
            }
            this.periodicCommitDisposable.dispose();
            if (this.consumer == null) {
                return Mono.empty();
            }
            return (Mono)Mono.fromRunnable((Runnable)new CloseEvent(this.receiverOptions.closeTimeout())).as(flux -> KafkaSchedulers.isCurrentThreadFromScheduler() ? flux : flux.subscribeOn(this.eventScheduler));
        }).onErrorResume(e -> {
            log.warn("Cancel exception: " + e);
            return Mono.empty();
        });
    }

    public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult result) {
        if (!this.isActive.get()) {
            return false;
        }
        switch (result) {
            case FAIL_NON_SERIALIZED: {
                return true;
            }
            case FAIL_OVERFLOW: {
                LockSupport.parkNanos(10L);
                return true;
            }
        }
        return false;
    }

    private class CloseEvent
    implements Runnable {
        private final long closeEndTimeMillis;

        CloseEvent(Duration timeout) {
            this.closeEndTimeMillis = System.currentTimeMillis() + timeout.toMillis();
        }

        @Override
        public void run() {
            block9: {
                try {
                    if (ConsumerEventLoop.this.consumer == null) break block9;
                    Collection<TopicPartition> manualAssignment = ConsumerEventLoop.this.receiverOptions.assignment();
                    if (manualAssignment != null && !manualAssignment.isEmpty()) {
                        ConsumerEventLoop.this.onPartitionsRevoked(manualAssignment);
                    }
                    int attempts = 3;
                    for (int i = 0; i < attempts; ++i) {
                        try {
                            long timeoutMillis;
                            boolean forceCommit = true;
                            if (ConsumerEventLoop.this.ackMode == AckMode.ATMOST_ONCE) {
                                forceCommit = ConsumerEventLoop.this.atmostOnceOffsets.undoCommitAhead(ConsumerEventLoop.this.commitEvent.commitBatch);
                            }
                            if (ConsumerEventLoop.this.ackMode != AckMode.EXACTLY_ONCE) {
                                ConsumerEventLoop.this.commitEvent.runIfRequired(forceCommit);
                                ConsumerEventLoop.this.commitEvent.waitFor(this.closeEndTimeMillis);
                            }
                            if ((timeoutMillis = this.closeEndTimeMillis - System.currentTimeMillis()) < 0L) {
                                timeoutMillis = 0L;
                            }
                            ConsumerEventLoop.this.consumer.close(Duration.ofMillis(timeoutMillis));
                            ConsumerEventLoop.this.consumer = null;
                            break;
                        }
                        catch (WakeupException e) {
                            if (i != attempts - 1) continue;
                            throw e;
                        }
                    }
                }
                catch (Exception e) {
                    log.error("Unexpected exception during close", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }
    }

    class CommitEvent
    implements Runnable {
        final CommittableBatch commitBatch = new CommittableBatch();
        private final AtomicBoolean isPending = new AtomicBoolean();
        private final AtomicInteger inProgress = new AtomicInteger();
        private final AtomicInteger consecutiveCommitFailures = new AtomicInteger();

        CommitEvent() {
        }

        @Override
        public void run() {
            block11: {
                if (!this.isPending.compareAndSet(true, false)) {
                    return;
                }
                CommittableBatch.CommitArgs commitArgs = this.commitBatch.getAndClearOffsets();
                try {
                    if (commitArgs == null) break block11;
                    if (!commitArgs.offsets().isEmpty()) {
                        switch (ConsumerEventLoop.this.ackMode) {
                            case ATMOST_ONCE: {
                                ConsumerEventLoop.this.consumer.commitSync(commitArgs.offsets());
                                this.handleSuccess(commitArgs, commitArgs.offsets());
                                ConsumerEventLoop.this.atmostOnceOffsets.onCommit(commitArgs.offsets());
                                break;
                            }
                            case EXACTLY_ONCE: {
                                break;
                            }
                            case AUTO_ACK: 
                            case MANUAL_ACK: {
                                this.inProgress.incrementAndGet();
                                try {
                                    ConsumerEventLoop.this.consumer.commitAsync(commitArgs.offsets(), (offsets, exception) -> {
                                        this.inProgress.decrementAndGet();
                                        if (exception == null) {
                                            this.handleSuccess(commitArgs, offsets);
                                        } else {
                                            this.handleFailure(commitArgs, exception);
                                        }
                                    });
                                }
                                catch (Throwable e) {
                                    this.inProgress.decrementAndGet();
                                    throw e;
                                }
                                ConsumerEventLoop.this.pollEvent.schedule();
                            }
                        }
                        break block11;
                    }
                    this.handleSuccess(commitArgs, commitArgs.offsets());
                }
                catch (Exception e) {
                    log.error("Unexpected exception", (Throwable)e);
                    this.handleFailure(commitArgs, e);
                }
            }
        }

        void runIfRequired(boolean force) {
            if (force) {
                this.isPending.set(true);
            }
            if (this.isPending.get()) {
                this.run();
            }
        }

        private void handleSuccess(CommittableBatch.CommitArgs commitArgs, Map<TopicPartition, OffsetAndMetadata> offsets) {
            if (!offsets.isEmpty()) {
                this.consecutiveCommitFailures.set(0);
            }
            if (commitArgs.callbackEmitters() != null) {
                for (MonoSink<Void> emitter : commitArgs.callbackEmitters()) {
                    emitter.success();
                }
            }
        }

        private void handleFailure(CommittableBatch.CommitArgs commitArgs, Exception exception) {
            boolean mayRetry;
            log.warn("Commit failed", (Throwable)exception);
            boolean bl = mayRetry = ConsumerEventLoop.this.isRetriableException.test(exception) && ConsumerEventLoop.this.consumer != null && this.consecutiveCommitFailures.incrementAndGet() < ConsumerEventLoop.this.receiverOptions.maxCommitAttempts();
            if (!mayRetry) {
                List<MonoSink<Void>> callbackEmitters = commitArgs.callbackEmitters();
                if (callbackEmitters != null && !callbackEmitters.isEmpty()) {
                    this.isPending.set(false);
                    this.commitBatch.restoreOffsets(commitArgs, false);
                    for (MonoSink<Void> emitter : callbackEmitters) {
                        emitter.error((Throwable)exception);
                    }
                } else {
                    ConsumerEventLoop.this.sink.emitError((Throwable)exception, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            } else {
                this.commitBatch.restoreOffsets(commitArgs, true);
                log.warn("Commit failed with exception" + exception + ", retries remaining " + (ConsumerEventLoop.this.receiverOptions.maxCommitAttempts() - this.consecutiveCommitFailures.get()));
                this.isPending.set(true);
                ConsumerEventLoop.this.pollEvent.schedule();
            }
        }

        void scheduleIfRequired() {
            if (ConsumerEventLoop.this.isActive.get() && this.isPending.compareAndSet(false, true)) {
                ConsumerEventLoop.this.eventScheduler.schedule((Runnable)this);
            }
        }

        private void waitFor(long endTimeMillis) {
            while (this.inProgress.get() > 0 && endTimeMillis - System.currentTimeMillis() > 0L) {
                ConsumerEventLoop.this.consumer.poll(Duration.ofMillis(1L));
            }
        }
    }

    class PollEvent
    implements Runnable {
        private final Duration pollTimeout;

        PollEvent() {
            this.pollTimeout = ConsumerEventLoop.this.receiverOptions.pollTimeout();
        }

        @Override
        public void run() {
            block8: {
                try {
                    if (ConsumerEventLoop.this.isActive.get()) {
                        ConsumerEventLoop.this.commitEvent.runIfRequired(false);
                        long r = ConsumerEventLoop.this.requested;
                        if (r > 0L) {
                            if (!ConsumerEventLoop.this.awaitingTransaction.get()) {
                                ConsumerEventLoop.this.consumer.resume((Collection)ConsumerEventLoop.this.consumer.assignment());
                            } else {
                                ConsumerEventLoop.this.consumer.pause((Collection)ConsumerEventLoop.this.consumer.assignment());
                                this.schedule();
                            }
                        } else {
                            ConsumerEventLoop.this.consumer.pause((Collection)ConsumerEventLoop.this.consumer.assignment());
                        }
                        ConsumerRecords records = ConsumerEventLoop.this.consumer.poll(this.pollTimeout);
                        if (ConsumerEventLoop.this.isActive.get() && (r > 1L || ConsumerEventLoop.this.commitEvent.inProgress.get() > 0)) {
                            this.schedule();
                        }
                        Operators.produced(REQUESTED, (Object)ConsumerEventLoop.this, (long)1L);
                        ConsumerEventLoop.this.sink.emitNext((Object)records, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                    }
                }
                catch (Exception e) {
                    if (!ConsumerEventLoop.this.isActive.get()) break block8;
                    log.error("Unexpected exception", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }

        void schedule() {
            ConsumerEventLoop.this.eventScheduler.schedule((Runnable)this);
        }
    }

    class SubscribeEvent
    implements Runnable {
        SubscribeEvent() {
        }

        @Override
        public void run() {
            block2: {
                log.info("SubscribeEvent");
                try {
                    ConsumerEventLoop.this.receiverOptions.subscriber(new ConsumerRebalanceListener(){

                        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                            log.debug("onPartitionsAssigned {}", partitions);
                            if (!partitions.isEmpty()) {
                                for (java.util.function.Consumer<Collection<ReceiverPartition>> onAssign : ConsumerEventLoop.this.receiverOptions.assignListeners()) {
                                    onAssign.accept(ConsumerEventLoop.this.toSeekable(partitions));
                                }
                            }
                        }

                        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                            ConsumerEventLoop.this.onPartitionsRevoked(partitions);
                        }
                    }).accept(ConsumerEventLoop.this.consumer);
                }
                catch (Exception e) {
                    if (!ConsumerEventLoop.this.isActive.get()) break block2;
                    log.error("Unexpected exception", (Throwable)e);
                    ConsumerEventLoop.this.sink.emitError((Throwable)e, (Sinks.EmitFailureHandler)ConsumerEventLoop.this);
                }
            }
        }
    }
}

