/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.errors;

import com.google.common.base.Throwables;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.metrics.StreamsErrorCollector;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class LogMetricAndContinueExceptionHandler
implements DeserializationExceptionHandler {
    private static final Logger log = LogManager.getLogger(LogMetricAndContinueExceptionHandler.class);
    private StreamsErrorCollector streamsErrorCollector;

    public DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
        log.debug(String.format("Exception caught during Deserialization, taskId: %s, topic: %s, partition: %d, offset: %d", context.taskId(), record.topic(), record.partition(), record.offset()), (Throwable)exception);
        this.streamsErrorCollector.recordError(record.topic());
        if (this.isCausedByAuthorizationError(exception)) {
            log.info(String.format("Authorization error when attempting to access the schema during deserialization. taskId: %s, topic: %s, partition: %d, offset: %d", context.taskId(), record.topic(), record.partition(), record.offset()));
            return DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL;
        }
        return DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE;
    }

    private boolean isCausedByAuthorizationError(Throwable throwable) {
        try {
            Throwable error = Throwables.getRootCause((Throwable)throwable);
            return error instanceof RestClientException && (((RestClientException)error).getStatus() == 401 || ((RestClientException)error).getStatus() == 403);
        }
        catch (Throwable t) {
            return false;
        }
    }

    public void configure(Map<String, ?> configs) {
        this.streamsErrorCollector = (StreamsErrorCollector)Objects.requireNonNull(configs.get("ksql.internal.streams.error.collector"));
    }
}

