/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.BackOffHandler;
import org.springframework.kafka.listener.ConsumerAwareRecordRecoverer;
import org.springframework.kafka.listener.DefaultBackOffHandler;
import org.springframework.kafka.listener.ErrorHandlingUtils;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecoveryStrategy;
import org.springframework.kafka.listener.RetryListener;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;

class FailedRecordTracker
implements RecoveryStrategy {
    private final Map<Thread, Map<TopicPartition, FailedRecord>> failures = new ConcurrentHashMap<Thread, Map<TopicPartition, FailedRecord>>();
    private final ConsumerAwareRecordRecoverer recoverer;
    private final boolean noRetries;
    private final List<RetryListener> retryListeners = new ArrayList<RetryListener>();
    private final BackOff backOff;
    private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction;
    private final BackOffHandler backOffHandler;
    private boolean resetStateOnRecoveryFailure = true;
    private boolean resetStateOnExceptionChange = true;

    FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, LogAccessor logger) {
        this(recoverer, backOff, null, logger);
    }

    FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, LogAccessor logger) {
        ConsumerAwareRecordRecoverer carr;
        Assert.notNull((Object)backOff, (String)"'backOff' cannot be null");
        this.recoverer = recoverer == null ? (rec, consumer, thr) -> {
            Map<TopicPartition, FailedRecord> map = this.failures.get(Thread.currentThread());
            FailedRecord failedRecord = null;
            if (map != null) {
                failedRecord = map.get(new TopicPartition(rec.topic(), rec.partition()));
            }
            logger.error((Throwable)thr, (CharSequence)("Backoff " + String.valueOf(failedRecord == null ? "none" : failedRecord.getBackOffExecution()) + " exhausted for " + KafkaUtils.format(rec)));
        } : (recoverer instanceof ConsumerAwareRecordRecoverer ? (carr = (ConsumerAwareRecordRecoverer)recoverer) : (rec, consumer, ex) -> recoverer.accept(rec, ex));
        this.noRetries = backOff.start().nextBackOff() == -1L;
        this.backOff = backOff;
        this.backOffHandler = backOffHandler == null ? new DefaultBackOffHandler() : backOffHandler;
    }

    public void setBackOffFunction(@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
        this.backOffFunction = backOffFunction;
    }

    public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) {
        this.resetStateOnRecoveryFailure = resetStateOnRecoveryFailure;
    }

    public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
        this.resetStateOnExceptionChange = resetStateOnExceptionChange;
    }

    public void setRetryListeners(RetryListener ... listeners) {
        this.retryListeners.clear();
        this.retryListeners.addAll(Arrays.asList(listeners));
    }

    List<RetryListener> getRetryListeners() {
        return this.retryListeners;
    }

    boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
        try {
            return this.recovered(record, exception, null, null);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override
    public boolean recovered(ConsumerRecord<?, ?> record, Exception exception, @Nullable MessageListenerContainer container, @Nullable Consumer<?, ?> consumer) throws InterruptedException {
        if (this.noRetries) {
            this.attemptRecovery(record, exception, null, consumer);
            return true;
        }
        Thread currentThread = Thread.currentThread();
        Map map = this.failures.computeIfAbsent(currentThread, t -> new HashMap());
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        FailedRecord failedRecord = this.getFailedRecordInstance(record, exception, map, topicPartition);
        this.retryListeners.forEach(rl -> rl.failedDelivery(record, exception, failedRecord.getDeliveryAttempts().get()));
        long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
        if (nextBackOff != -1L) {
            this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
            return false;
        }
        this.attemptRecovery(record, exception, topicPartition, consumer);
        map.remove(topicPartition);
        if (map.isEmpty()) {
            this.failures.remove(currentThread);
        }
        return true;
    }

    private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Exception exception, Map<TopicPartition, FailedRecord> map, TopicPartition topicPartition) {
        Exception realException = ErrorHandlingUtils.findRootCause(exception);
        FailedRecord failedRecord = map.get(topicPartition);
        if (failedRecord == null || failedRecord.getOffset() != record.offset() || this.resetStateOnExceptionChange && !realException.getClass().isInstance(failedRecord.getLastException())) {
            failedRecord = new FailedRecord(record.offset(), this.determineBackOff(record, realException).start());
            map.put(topicPartition, failedRecord);
        } else {
            failedRecord.getDeliveryAttempts().incrementAndGet();
        }
        failedRecord.setLastException(realException);
        return failedRecord;
    }

    private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
        if (this.backOffFunction == null) {
            return this.backOff;
        }
        BackOff backOffToUse = this.backOffFunction.apply(record, exception);
        return backOffToUse != null ? backOffToUse : this.backOff;
    }

    private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp, Consumer<?, ?> consumer) {
        try {
            this.recoverer.accept(record, consumer, exception);
            this.retryListeners.forEach(rl -> rl.recovered(record, exception));
        }
        catch (RuntimeException e) {
            this.retryListeners.forEach(rl -> rl.recoveryFailed(record, exception, (Exception)e));
            if (tp != null && this.resetStateOnRecoveryFailure) {
                this.failures.get(Thread.currentThread()).remove(tp);
            }
            throw e;
        }
    }

    void clearThreadState() {
        this.failures.remove(Thread.currentThread());
    }

    ConsumerAwareRecordRecoverer getRecoverer() {
        return this.recoverer;
    }

    int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
        Map<TopicPartition, FailedRecord> map = this.failures.get(Thread.currentThread());
        if (map == null) {
            return 1;
        }
        FailedRecord failedRecord = map.get(topicPartitionOffset.getTopicPartition());
        if (failedRecord == null || failedRecord.getOffset() != topicPartitionOffset.getOffset().longValue()) {
            return 1;
        }
        return failedRecord.getDeliveryAttempts().get() + 1;
    }

    static final class FailedRecord {
        private final long offset;
        private final BackOffExecution backOffExecution;
        private final AtomicInteger deliveryAttempts = new AtomicInteger(1);
        private Exception lastException;

        FailedRecord(long offset, BackOffExecution backOffExecution) {
            this.offset = offset;
            this.backOffExecution = backOffExecution;
        }

        long getOffset() {
            return this.offset;
        }

        BackOffExecution getBackOffExecution() {
            return this.backOffExecution;
        }

        AtomicInteger getDeliveryAttempts() {
            return this.deliveryAttempts;
        }

        Exception getLastException() {
            return this.lastException;
        }

        void setLastException(Exception lastException) {
            this.lastException = lastException;
        }
    }
}

