package org.apache.kafka.connect.runtime.errors;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.class */
public class RetryWithToleranceOperator implements AutoCloseable {
    public static final long RETRIES_DELAY_MIN_MS = 300;
    private static final Class<? extends Exception> NON_TOLERABLE_EXCEPTION_CLASS;
    private final long errorRetryTimeout;
    private final long errorMaxDelayInMillis;
    private final ToleranceType errorToleranceType;
    private long totalFailures;
    private final Time time;
    private ErrorHandlingMetrics errorHandlingMetrics;
    protected final ProcessingContext context;
    private boolean failNonToleratedException;
    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
    private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType = new int[ToleranceType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[ToleranceType.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[ToleranceType.ALL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public RetryWithToleranceOperator(long j, long j2, ToleranceType toleranceType, Time time) {
        this(j, j2, toleranceType, time, new ProcessingContext());
    }

    public RetryWithToleranceOperator(long j, long j2, ToleranceType toleranceType, Time time, boolean z) {
        this(j, j2, toleranceType, time, new ProcessingContext());
        this.failNonToleratedException = z;
    }

    RetryWithToleranceOperator(long j, long j2, ToleranceType toleranceType, Time time, ProcessingContext processingContext) {
        this.totalFailures = 0L;
        this.failNonToleratedException = false;
        this.errorRetryTimeout = j;
        this.errorMaxDelayInMillis = j2;
        this.errorToleranceType = toleranceType;
        this.time = time;
        this.context = processingContext;
    }

    public synchronized Future<Void> executeFailed(Stage stage, Class<?> cls, ConsumerRecord<byte[], byte[]> consumerRecord, Throwable th) {
        markAsFailed();
        this.context.consumerRecord(consumerRecord);
        this.context.currentContext(stage, cls);
        this.context.error(th);
        this.errorHandlingMetrics.recordFailure();
        Future<Void> report = this.context.report();
        if (withinToleranceLimits()) {
            return report;
        }
        this.errorHandlingMetrics.recordError();
        throw new ConnectException("Tolerance exceeded in error handler", th);
    }

    public synchronized Future<Void> executeFailed(Stage stage, Class<?> cls, SourceRecord sourceRecord, Throwable th) {
        markAsFailed();
        this.context.sourceRecord(sourceRecord);
        this.context.currentContext(stage, cls);
        this.context.error(th);
        this.errorHandlingMetrics.recordFailure();
        Future<Void> report = this.context.report();
        if (withinToleranceLimits()) {
            return report;
        }
        this.errorHandlingMetrics.recordError();
        throw new ConnectException("Tolerance exceeded in Source Worker error handler", th);
    }

    public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?> cls) {
        this.context.currentContext(stage, cls);
        if (this.context.failed()) {
            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
            return null;
        }
        try {
            V v = (V) execAndHandleError(operation, TOLERABLE_EXCEPTIONS.getOrDefault(this.context.stage(), RetriableException.class));
            if (this.context.failed()) {
                this.errorHandlingMetrics.recordError();
                this.context.report();
            }
            return v;
        } catch (Throwable th) {
            if (this.context.failed()) {
                this.errorHandlingMetrics.recordError();
                this.context.report();
            }
            throw th;
        }
    }

    protected <V> V execAndRetry(Operation<V> operation) throws Exception {
        int i = 0;
        long milliseconds = this.time.milliseconds();
        long j = milliseconds + this.errorRetryTimeout;
        while (true) {
            try {
                try {
                    i++;
                    V call = operation.call();
                    this.context.attempt(i);
                    return call;
                } catch (RetriableException e) {
                    log.trace("Caught a retriable exception while executing {} operation with {}", this.context.stage(), this.context.executingClass());
                    this.errorHandlingMetrics.recordFailure();
                    if (!checkRetry(milliseconds)) {
                        log.trace("Can't retry. start={}, attempt={}, deadline={}", new Object[]{Long.valueOf(milliseconds), Integer.valueOf(i), Long.valueOf(j)});
                        this.context.error(e);
                        this.context.attempt(i);
                        return null;
                    }
                    backoff(i, j);
                    if (Thread.currentThread().isInterrupted()) {
                        log.trace("Thread was interrupted. Marking operation as failed.");
                        this.context.error(e);
                        this.context.attempt(i);
                        return null;
                    }
                    this.errorHandlingMetrics.recordRetry();
                    this.context.attempt(i);
                }
            } catch (Throwable th) {
                this.context.attempt(i);
                throw th;
            }
        }
    }

    protected <V> V execAndHandleError(Operation<V> operation, Class<? extends Exception> cls) {
        try {
            V v = (V) execAndRetry(operation);
            if (this.context.failed()) {
                markAsFailed();
                this.errorHandlingMetrics.recordSkipped();
            }
            return v;
        } catch (Exception e) {
            this.errorHandlingMetrics.recordFailure();
            markAsFailed();
            this.context.error(e);
            if (!cls.isAssignableFrom(e.getClass())) {
                throw new ConnectException("Unhandled exception in error handler", e);
            }
            if (this.failNonToleratedException && NON_TOLERABLE_EXCEPTION_CLASS.isAssignableFrom(e.getClass())) {
                throw new ConnectException("Non tolerated exception in error handler", e);
            }
            if (!withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", e);
            }
            this.errorHandlingMetrics.recordSkipped();
            return null;
        }
    }

    void markAsFailed() {
        this.errorHandlingMetrics.recordErrorTimestamp();
        this.totalFailures++;
    }

    public synchronized boolean withinToleranceLimits() {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[this.errorToleranceType.ordinal()]) {
            case 1:
                return this.totalFailures <= 0;
            case IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2 /* 2 */:
                return true;
            default:
                throw new ConfigException("Unknown tolerance type: {}", this.errorToleranceType);
        }
    }

    public ToleranceType getErrorToleranceType() {
        return this.errorToleranceType;
    }

    boolean checkRetry(long j) {
        return this.time.milliseconds() - j < this.errorRetryTimeout;
    }

    void backoff(int i, long j) {
        long j2 = 300 << (i - 1);
        if (j2 > this.errorMaxDelayInMillis) {
            j2 = ThreadLocalRandom.current().nextLong(this.errorMaxDelayInMillis);
        }
        if (j2 + this.time.milliseconds() > j) {
            j2 = j - this.time.milliseconds();
        }
        log.debug("Sleeping for {} millis", Long.valueOf(j2));
        this.time.sleep(j2);
    }

    public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
        this.errorHandlingMetrics = errorHandlingMetrics;
    }

    public String toString() {
        return "RetryWithToleranceOperator{errorRetryTimeout=" + this.errorRetryTimeout + ", errorMaxDelayInMillis=" + this.errorMaxDelayInMillis + ", errorToleranceType=" + this.errorToleranceType + ", totalFailures=" + this.totalFailures + ", time=" + this.time + ", context=" + this.context + '}';
    }

    public synchronized void reporters(List<ErrorReporter> list) {
        this.context.reporters(list);
    }

    public synchronized void sourceRecord(SourceRecord sourceRecord) {
        this.context.sourceRecord(sourceRecord);
    }

    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        this.context.consumerRecord(consumerRecord);
    }

    public synchronized boolean failed() {
        return this.context.failed();
    }

    public synchronized Throwable error() {
        return this.context.error();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        this.context.close();
    }

    static {
        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
        NON_TOLERABLE_EXCEPTION_CLASS = ConfigException.class;
    }
}
