/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.kafka;

import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.confluent.controlcenter.ReferenceCountingHolder;
import io.confluent.controlcenter.kafka.ClusterView;
import io.confluent.controlcenter.kafka.ConsumerSupplier;
import io.confluent.controlcenter.kafka.DelegatingConsumer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;

public class CachingConsumerSupplier<K, V>
implements ConsumerSupplier<K, V, String> {
    private static final int DEFAULT_MAX_CACHE_SIZE = 100;
    private static final long DEFAULT_CACHE_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(15L);
    private final Cache<String, ReferenceCountingHolder<Consumer<K, V>>> cache;
    private final ConsumerSupplier<K, V, String> consumerSupplier;

    public CachingConsumerSupplier(ClusterView clusterView, ConsumerSupplier<K, V, String> consumerSupplier) {
        this.consumerSupplier = consumerSupplier;
        this.cache = CacheBuilder.newBuilder().maximumSize(100L).expireAfterAccess(DEFAULT_CACHE_EXPIRATION_MS, TimeUnit.MILLISECONDS).removalListener(new RemovalListener<String, ReferenceCountingHolder<Consumer<K, V>>>(){

            public void onRemoval(RemovalNotification<String, ReferenceCountingHolder<Consumer<K, V>>> notification) {
                ((ReferenceCountingHolder)notification.getValue()).close();
            }
        }).build();
        clusterView.registerClusterCallback(new ClusterView.ClusterCallback(){

            @Override
            public void clusterUpdated(String clusterId) {
                CachingConsumerSupplier.this.cache.invalidate((Object)clusterId);
            }

            @Override
            public void clusterAdded(String clusterId) {
            }
        });
    }

    @Override
    public Consumer<K, V> getConsumer(final String clusterId) {
        try {
            final ReferenceCountingHolder holder = (ReferenceCountingHolder)this.cache.get((Object)clusterId, new Callable<ReferenceCountingHolder<Consumer<K, V>>>(){

                @Override
                public ReferenceCountingHolder<Consumer<K, V>> call() throws Exception {
                    return new ReferenceCountingHolder(CachingConsumerSupplier.this.consumerSupplier.getConsumer(clusterId));
                }
            });
            holder.increment();
            return new DelegatingConsumer<K, V>((Consumer)holder.get()){
                final AtomicBoolean closed;
                {
                    super(delegate);
                    this.closed = new AtomicBoolean(false);
                }

                @Override
                public void close() {
                    this.release();
                }

                @Override
                public void close(long duration, TimeUnit unit) {
                    this.release();
                }

                private void release() {
                    if (this.closed.compareAndSet(false, true)) {
                        ((Consumer)holder.get()).unsubscribe();
                        holder.decrement();
                    }
                }
            };
        }
        catch (UncheckedExecutionException uee) {
            Throwables.throwIfUnchecked((Throwable)uee.getCause());
            throw new RuntimeException(uee);
        }
        catch (ExecutionException ee) {
            throw new RuntimeException(ee.getCause());
        }
    }
}

