package org.aerogear.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.CDI;
import javax.enterprise.util.AnnotationLiteral;
import org.aerogear.kafka.DefaultConsumerRebalanceListener;
import org.aerogear.kafka.cdi.annotation.KafkaConfig;
import org.aerogear.kafka.cdi.extension.VerySimpleEnvironmentResolver;
import org.aerogear.kafka.serialization.CafdiSerdes;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.jboss.weld.context.bound.BoundRequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/aerogear/kafka/impl/DelegationKafkaConsumer.class */
public class DelegationKafkaConsumer implements Runnable {
    private Object consumerInstance;
    private KafkaConsumer<?, ?> consumer;
    private List<String> topics;
    private AnnotatedMethod annotatedListenerMethod;
    private ConsumerRebalanceListener consumerRebalanceListener;
    private int numberOfRetries;
    private long retryBackoffMs;
    private Class<?>[] parameterTypes;
    private Type[] genericParameterTypes;
    private ConsumerMode mode;
    private KafkaCdiMetrics metrics;
    private final Logger logger = LoggerFactory.getLogger(DelegationKafkaConsumer.class);
    private final AtomicBoolean running = new AtomicBoolean(Boolean.TRUE.booleanValue());
    private final Properties properties = new Properties();
    private final AtomicBoolean started = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/aerogear/kafka/impl/DelegationKafkaConsumer$ConsumerMode.class */
    public enum ConsumerMode {
        SINGLE,
        ALL
    }

    private ConsumerRebalanceListener createConsumerRebalanceListener(Class<? extends ConsumerRebalanceListener> cls) {
        if (cls.equals(DefaultConsumerRebalanceListener.class)) {
            return new DefaultConsumerRebalanceListener(this.consumer);
        }
        try {
            return cls.getDeclaredConstructor(Consumer.class).newInstance(this.consumer);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            this.logger.error("Could not create desired listener, using DefaultConsumerRebalanceListener", e);
            return new DefaultConsumerRebalanceListener(this.consumer);
        }
    }

    private Class<?> consumerKeyType(Class<?> cls) {
        return this.parameterTypes.length >= 2 ? this.parameterTypes[0] : (this.parameterTypes.length == 1 && ConsumerRecords.class.isAssignableFrom(this.parameterTypes[0])) ? (Class) ((ParameterizedType) this.genericParameterTypes[0]).getActualTypeArguments()[0] : cls;
    }

    private Class<?> consumerValueType() {
        return this.parameterTypes.length >= 2 ? this.parameterTypes[1] : (this.parameterTypes.length == 1 && ConsumerRecords.class.isAssignableFrom(this.parameterTypes[0])) ? (Class) ((ParameterizedType) this.genericParameterTypes[0]).getActualTypeArguments()[1] : this.parameterTypes[0];
    }

    private <K, V> void createKafkaConsumer(Class<K> cls, Class<V> cls2, Properties properties) {
        this.consumer = new KafkaConsumer<>(properties, CafdiSerdes.serdeFrom(cls).deserializer(), CafdiSerdes.serdeFrom(cls2).deserializer());
    }

    public void initialize(String str, AnnotatedMethod annotatedMethod, BeanManager beanManager, KafkaConfig kafkaConfig) {
        org.aerogear.kafka.cdi.annotation.Consumer consumer = (org.aerogear.kafka.cdi.annotation.Consumer) annotatedMethod.getAnnotation(org.aerogear.kafka.cdi.annotation.Consumer.class);
        this.topics = (List) Arrays.stream(consumer.topics()).map(VerySimpleEnvironmentResolver::resolveVariables).collect(Collectors.toList());
        this.numberOfRetries = IntStream.of(consumer.retries(), kafkaConfig.defaultConsumerRetries()).filter(i -> {
            return i > 0;
        }).findFirst().orElse(0);
        this.retryBackoffMs = IntStream.of(consumer.retryBackoffMs(), kafkaConfig.defaultConsumerRetryBackoffMs()).filter(i2 -> {
            return i2 > 0;
        }).findFirst().orElse(0);
        String resolveVariables = VerySimpleEnvironmentResolver.resolveVariables(consumer.groupId());
        Class<?> keyType = consumer.keyType();
        this.annotatedListenerMethod = annotatedMethod;
        this.parameterTypes = this.annotatedListenerMethod.getJavaMember().getParameterTypes();
        this.genericParameterTypes = this.annotatedListenerMethod.getJavaMember().getGenericParameterTypes();
        this.mode = getConsumerMode(this.parameterTypes);
        Class<?> consumerKeyType = consumerKeyType(keyType);
        Class<?> consumerValueType = consumerValueType();
        this.properties.put("bootstrap.servers", str);
        this.properties.put("group.id", resolveVariables);
        IntStream.of(consumer.fetchMaxWaitMs(), kafkaConfig.defaultFetchMaxWaitMs()).filter(i3 -> {
            return i3 > 0;
        }).findFirst().ifPresent(i4 -> {
            this.properties.put("fetch.max.wait.ms", Integer.valueOf(i4));
        });
        IntStream.of(consumer.requestTimeoutMs(), kafkaConfig.defaultRequestTimeoutMs()).filter(i5 -> {
            return i5 > 0;
        }).findFirst().ifPresent(i6 -> {
            this.properties.put("request.timeout.ms", Integer.valueOf(i6));
        });
        this.properties.put("auto.offset.reset", consumer.offset());
        this.properties.put("key.deserializer", CafdiSerdes.serdeFrom(consumerKeyType).deserializer().getClass());
        this.properties.put("value.deserializer", CafdiSerdes.serdeFrom(consumerValueType).deserializer().getClass());
        createKafkaConsumer(consumerKeyType, consumerValueType, this.properties);
        this.consumerRebalanceListener = createConsumerRebalanceListener(consumer.consumerRebalanceListener());
        Bean resolve = beanManager.resolve(beanManager.getBeans(this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), new Annotation[0]));
        this.consumerInstance = beanManager.getReference(resolve, this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), beanManager.createCreationalContext(resolve));
        Bean resolve2 = beanManager.resolve(beanManager.getBeans(KafkaCdiMetrics.class, new Annotation[]{new AnnotationLiteral<Any>() { // from class: org.aerogear.kafka.impl.DelegationKafkaConsumer.1
        }}));
        this.metrics = (KafkaCdiMetrics) beanManager.getReference(resolve2, KafkaCdiMetrics.class, beanManager.createCreationalContext(resolve2));
        this.metrics.consumerCreated();
    }

    @Override // java.lang.Runnable
    public void run() {
        BoundRequestContext boundRequestContext;
        ConcurrentHashMap concurrentHashMap;
        try {
            try {
                try {
                    this.consumer.subscribe(this.topics, this.consumerRebalanceListener);
                    this.logger.info("subscribed to {}", this.topics);
                    loop0: while (isRunning()) {
                        long currentTimeMillis = System.currentTimeMillis();
                        ConsumerRecords<?, ?> poll = this.consumer.poll(Duration.ofMillis(100L));
                        logSlowPoll(currentTimeMillis, System.currentTimeMillis());
                        if (!this.started.getAndSet(true)) {
                            this.metrics.consumerStarted();
                        }
                        Iterator it = poll.iterator();
                        while (true) {
                            if (it.hasNext()) {
                                ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
                                BeanManager beanManager = CDI.current().getBeanManager();
                                Bean resolve = beanManager.resolve(beanManager.getBeans(BoundRequestContext.class, new Annotation[0]));
                                CreationalContext createCreationalContext = beanManager.createCreationalContext((Contextual) null);
                                int i = 0;
                                boolean z = false;
                                do {
                                    try {
                                        try {
                                            boundRequestContext = (BoundRequestContext) beanManager.getReference(resolve, BoundRequestContext.class, createCreationalContext);
                                            concurrentHashMap = new ConcurrentHashMap();
                                            CdiRequestScopeUtils.start(boundRequestContext, concurrentHashMap);
                                            try {
                                            } catch (Throwable th) {
                                                CdiRequestScopeUtils.end(boundRequestContext, concurrentHashMap);
                                                throw th;
                                                break loop0;
                                            }
                                        } catch (Throwable th2) {
                                            createCreationalContext.release();
                                            throw th2;
                                        }
                                    } catch (IllegalAccessException e) {
                                        this.logger.error("error dispatching received value to consumer", e);
                                    } catch (InvocationTargetException e2) {
                                        if (i == this.numberOfRetries) {
                                            this.logger.error(String.format("error dispatching received value to consumer, giving up after run %d/%d", Integer.valueOf(i + 1), Integer.valueOf(this.numberOfRetries)), e2);
                                        } else {
                                            this.logger.warn(String.format("failed on run %d/%d, will retry: %s", Integer.valueOf(i + 1), Integer.valueOf(this.numberOfRetries), e2.toString()));
                                            sleepRetryBackoffMs();
                                        }
                                        i++;
                                    } catch (SerializationException e3) {
                                        this.logger.error("failed to deserialize message, giving up", e3);
                                    }
                                    if (this.mode != ConsumerMode.SINGLE) {
                                        dispatchCompletePayload(poll);
                                        CdiRequestScopeUtils.end(boundRequestContext, concurrentHashMap);
                                        createCreationalContext.release();
                                        break;
                                    } else {
                                        dispatchSinglePayload(consumerRecord);
                                        z = true;
                                        CdiRequestScopeUtils.end(boundRequestContext, concurrentHashMap);
                                        this.logger.trace("dispatched payload {} to consumer", consumerRecord.value());
                                        if (z) {
                                            break;
                                        }
                                    }
                                } while (i <= this.numberOfRetries);
                                createCreationalContext.release();
                            }
                        }
                    }
                    this.logger.info("Close the consumer.");
                    this.metrics.consumerClosed();
                    this.consumer.close();
                } catch (Throwable th3) {
                    this.logger.info("Close the consumer.");
                    this.metrics.consumerClosed();
                    this.consumer.close();
                    throw th3;
                }
            } catch (WakeupException e4) {
                if (isRunning()) {
                    this.logger.trace("Unexpected WakeupException", e4);
                    throw e4;
                }
                this.logger.info("Close the consumer.");
                this.metrics.consumerClosed();
                this.consumer.close();
            }
        } catch (Throwable th4) {
            this.logger.error("Unexpected fatal error", th4);
            this.logger.info("Close the consumer.");
            this.metrics.consumerClosed();
            this.consumer.close();
        }
    }

    private void logSlowPoll(long j, long j2) {
        long j3 = j2 - j;
        if (j3 > 2000) {
            this.logger.warn("slow kafka poll() took {} ms - longer than warning threshold of {} ms", Long.valueOf(j3), 2000L);
        }
    }

    private void sleepRetryBackoffMs() {
        try {
            Thread.sleep(this.retryBackoffMs);
        } catch (InterruptedException e) {
        }
    }

    private void dispatchSinglePayload(ConsumerRecord<?, ?> consumerRecord) throws IllegalAccessException, InvocationTargetException {
        this.logger.trace("dispatching payload {} to consumer", consumerRecord.value());
        if (this.parameterTypes.length == 3) {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.key(), consumerRecord.value(), consumerRecord.headers());
        } else if (this.parameterTypes.length == 2) {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.key(), consumerRecord.value());
        } else {
            this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.value());
        }
    }

    private void dispatchCompletePayload(ConsumerRecords<?, ?> consumerRecords) throws IllegalAccessException, InvocationTargetException {
        this.logger.trace("dispatching payload {} consumer records to consumer", Integer.valueOf(consumerRecords.count()));
        this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecords);
    }

    private ConsumerMode getConsumerMode(Class<?>[] clsArr) {
        if (clsArr.length > 1) {
            return ConsumerMode.SINGLE;
        }
        if (clsArr.length == 1) {
            return clsArr[0].isAssignableFrom(ConsumerRecords.class) ? ConsumerMode.ALL : ConsumerMode.SINGLE;
        }
        throw new IllegalArgumentException("Consumer methods must have at least one parameter.");
    }

    private boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        this.logger.info("Shutting down the consumer.");
        this.running.set(Boolean.FALSE.booleanValue());
        this.consumer.wakeup();
    }
}
