package io.confluent.controlcenter.kafka;

import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.controlcenter.ReferenceCountingHolder;
import io.confluent.controlcenter.rest.TokenCredential;
import io.confluent.controlcenter.util.ProxyUtil;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;

/* loaded from: input_file:io/confluent/controlcenter/kafka/CachingConsumerSupplier.class */
public class CachingConsumerSupplier<K, V> implements ConsumerSupplier<K, V> {
    private static final int DEFAULT_MAX_CACHE_SIZE = 100;
    private static final long DEFAULT_CACHE_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(15);
    private final Cache<TokenCredential, ReferenceCountingHolder<Consumer<K, V>>> cache = CacheBuilder.newBuilder().maximumSize(100).expireAfterAccess(DEFAULT_CACHE_EXPIRATION_MS, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<TokenCredential, ReferenceCountingHolder<Consumer<K, V>>>() { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.1
        @Override // com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<TokenCredential, ReferenceCountingHolder<Consumer<K, V>>> removalNotification) {
            removalNotification.getValue().close();
        }
    }).build();
    private final ConsumerSupplier<K, V> consumerSupplier;

    public CachingConsumerSupplier(ConsumerSupplier<K, V> consumerSupplier) {
        this.consumerSupplier = consumerSupplier;
    }

    @Subscribe
    public void handleClusterChangeEvent(ClusterChangeEvent clusterChangeEvent) {
        this.cache.asMap().keySet().forEach(tokenCredential -> {
            if (tokenCredential.matchesCluster(clusterChangeEvent.getClusterId())) {
                this.cache.invalidate(tokenCredential);
            }
        });
    }

    @Override // io.confluent.controlcenter.kafka.ConsumerSupplier
    public Consumer<K, V> getConsumer(final TokenCredential tokenCredential) {
        try {
            ReferenceCountingHolder<Consumer<K, V>> referenceCountingHolder = this.cache.get(tokenCredential, new Callable<ReferenceCountingHolder<Consumer<K, V>>>() { // from class: io.confluent.controlcenter.kafka.CachingConsumerSupplier.2
                @Override // java.util.concurrent.Callable
                public ReferenceCountingHolder<Consumer<K, V>> call() throws Exception {
                    return new ReferenceCountingHolder<>(CachingConsumerSupplier.this.consumerSupplier.getConsumer(tokenCredential));
                }
            });
            referenceCountingHolder.increment();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            return (Consumer) ProxyUtil.newDelegatingProxy(Consumer.class, referenceCountingHolder.get(), () -> {
                if (atomicBoolean.compareAndSet(false, true)) {
                    ((Consumer) referenceCountingHolder.get()).unsubscribe();
                    referenceCountingHolder.decrement();
                }
            });
        } catch (UncheckedExecutionException e) {
            Throwables.throwIfUnchecked(e.getCause());
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause());
        }
    }
}
