package io.confluent.kafkarest.controllers;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.auth.CloudKafkaRestContext;
import io.confluent.kafkarest.ratelimit.RateLimitExceededException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Qualifier;
import javax.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.glassfish.hk2.api.AnnotationLiteral;
import org.glassfish.hk2.api.Factory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeModule.class */
public final class SimpleConsumeModule extends AbstractBinder {

    /* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeModule$KafkaConsumerProviderImpl.class */
    private static final class KafkaConsumerProviderImpl implements KafkaConsumerProvider {
        private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaConsumerProviderImpl.class);
        private final Properties baseConsumerConfig;

        @Inject
        KafkaConsumerProviderImpl(KafkaRestContext kafkaRestContext) {
            if (kafkaRestContext instanceof CloudKafkaRestContext) {
                this.baseConsumerConfig = ((CloudKafkaRestContext) kafkaRestContext).getConsumerProperties();
            } else {
                this.baseConsumerConfig = kafkaRestContext.getConfig().getConsumerProperties();
            }
            this.baseConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            this.baseConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            this.baseConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            logger.info("Base consumer config is {}", this.baseConsumerConfig);
        }

        @Override // io.confluent.kafkarest.controllers.KafkaConsumerProvider
        public <K, V> KafkaConsumer<K, V> getConsumer(Properties properties) {
            Properties properties2 = new Properties();
            properties2.putAll(this.baseConsumerConfig);
            for (Map.Entry<K, V> entry : properties.entrySet()) {
                Object put = properties2.put(entry.getKey(), entry.getValue());
                if (logger.isTraceEnabled() && put != null) {
                    logger.trace("Replaced consumer config for key {}: old value: {} -> new value: {}", entry.getKey(), put, entry.getValue());
                }
            }
            return new KafkaConsumer<>(properties2);
        }

        @Override // io.confluent.kafkarest.controllers.KafkaConsumerProvider
        public <K, V> void releaseConsumer(KafkaConsumer<K, V> kafkaConsumer) {
            kafkaConsumer.close();
        }
    }

    @Qualifier
    @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
    @Retention(RetentionPolicy.RUNTIME)
    /* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeModule$SimpleConsumeExecutor.class */
    public @interface SimpleConsumeExecutor {
    }

    /* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeModule$SimpleConsumeExecutorFactory.class */
    private static final class SimpleConsumeExecutorFactory implements Factory<ExecutorService> {
        private static final String SIMPLE_CONSUME_EXECUTOR_MAX_THREADS_CONFIG = "simple_consume.executor.max_threads";
        private static final int SIMPLE_CONSUME_EXECUTOR_MAX_THREADS_DEFAULT = 10;
        private static final Logger log = LoggerFactory.getLogger((Class<?>) SimpleConsumeExecutorFactory.class);
        private final ExecutorService simpleConsumeExecutor;

        @Inject
        public SimpleConsumeExecutorFactory(KafkaRestConfig kafkaRestConfig) {
            int intValue;
            int max = Math.max(Runtime.getRuntime().availableProcessors(), 10);
            if (kafkaRestConfig != null) {
                Object obj = kafkaRestConfig.getOriginalProperties().get(SIMPLE_CONSUME_EXECUTOR_MAX_THREADS_CONFIG);
                if ((obj instanceof Integer) && (intValue = ((Integer) obj).intValue()) > 0) {
                    max = intValue;
                }
            } else {
                log.info("Null config provided for Simple Consume executor");
            }
            log.debug("Initializing Simple Consume executor with maxPoolSize of {}", Integer.valueOf(max));
            this.simpleConsumeExecutor = new ThreadPoolExecutor(1, max, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setNameFormat("simple-consume-thread-%d").build(), (runnable, threadPoolExecutor) -> {
                throw new RateLimitExceededException();
            });
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.glassfish.hk2.api.Factory
        public ExecutorService provide() {
            return this.simpleConsumeExecutor;
        }

        @Override // org.glassfish.hk2.api.Factory
        public void dispose(ExecutorService executorService) {
            this.simpleConsumeExecutor.shutdown();
            try {
                if (!this.simpleConsumeExecutor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    log.debug("Could not shutdown gracefully in 60 seconds, interrupt tasks and shutdown");
                    this.simpleConsumeExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                log.debug("Dispose thread interrupted while waiting, interrupt tasks and shutdown");
                this.simpleConsumeExecutor.shutdownNow();
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/controllers/SimpleConsumeModule$SimpleConsumeExecutorImpl.class */
    private static final class SimpleConsumeExecutorImpl extends AnnotationLiteral<SimpleConsumeExecutor> implements SimpleConsumeExecutor {
        private SimpleConsumeExecutorImpl() {
        }
    }

    @Override // org.glassfish.hk2.utilities.binding.AbstractBinder
    protected void configure() {
        bind(SimpleConsumeManagerImpl.class).to(SimpleConsumeManager.class);
        bind(KafkaConsumerProviderImpl.class).to(KafkaConsumerProvider.class).in(RequestScoped.class);
        bindFactory(SimpleConsumeExecutorFactory.class, Singleton.class).qualifiedBy(new SimpleConsumeExecutorImpl()).to(ExecutorService.class).in(Singleton.class);
    }
}
