/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.common.thrift;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.base.Closure;
import com.twitter.common.base.Closures;
import com.twitter.common.base.MorePreconditions;
import com.twitter.common.net.loadbalancing.LeastConnectedStrategy;
import com.twitter.common.net.loadbalancing.LoadBalancer;
import com.twitter.common.net.loadbalancing.LoadBalancerImpl;
import com.twitter.common.net.loadbalancing.LoadBalancingStrategy;
import com.twitter.common.net.loadbalancing.MarkDeadStrategyWithHostCheck;
import com.twitter.common.net.loadbalancing.RequestTracker;
import com.twitter.common.net.loadbalancing.TrafficMonitorAdapter;
import com.twitter.common.net.monitoring.TrafficMonitor;
import com.twitter.common.net.pool.Connection;
import com.twitter.common.net.pool.ConnectionFactory;
import com.twitter.common.net.pool.ConnectionPool;
import com.twitter.common.net.pool.DynamicHostSet;
import com.twitter.common.net.pool.DynamicPool;
import com.twitter.common.net.pool.MetaPool;
import com.twitter.common.net.pool.ObjectPool;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.quantity.Unit;
import com.twitter.common.stats.Stats;
import com.twitter.common.stats.StatsProvider;
import com.twitter.common.thrift.Thrift;
import com.twitter.common.thrift.ThriftConnectionFactory;
import com.twitter.common.thrift.Util;
import com.twitter.common.util.BackoffDecider;
import com.twitter.common.util.BackoffStrategy;
import com.twitter.common.util.TruncatedBinaryBackoff;
import com.twitter.common.util.concurrent.ForwardingExecutorService;
import com.twitter.thrift.ServiceInstance;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.thrift.async.TAsyncClient;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;

public class ThriftFactory<T> {
    private static final Amount<Long, Time> DEFAULT_DEAD_TARGET_RESTORE_INTERVAL = Amount.of((long)1L, (Unit)Time.SECONDS);
    private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 50;
    private Class<T> serviceInterface;
    private Function<TTransport, T> clientFactory;
    private int maxConnectionsPerEndpoint;
    private Amount<Long, Time> connectionRestoreInterval;
    private boolean framedTransport;
    private LoadBalancingStrategy<InetSocketAddress> loadBalancingStrategy = null;
    private final TrafficMonitor<InetSocketAddress> monitor;
    private Amount<Long, Time> socketTimeout = null;
    private Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback = Closures.noop();
    private StatsProvider statsProvider = Stats.STATS_PROVIDER;
    private String serviceName;
    private boolean sslTransport;

    public static <T> ThriftFactory<T> create(Class<T> serviceInterface) {
        return new ThriftFactory<T>(serviceInterface);
    }

    private ThriftFactory(Class<T> serviceInterface) {
        this.serviceInterface = Thrift.checkServiceInterface(serviceInterface);
        this.maxConnectionsPerEndpoint = 50;
        this.connectionRestoreInterval = DEFAULT_DEAD_TARGET_RESTORE_INTERVAL;
        this.framedTransport = false;
        this.monitor = new TrafficMonitor(serviceInterface.getName());
        this.serviceName = serviceInterface.getEnclosingClass().getSimpleName();
        this.sslTransport = false;
    }

    private void checkBaseState() {
        Preconditions.checkArgument((this.maxConnectionsPerEndpoint > 0 ? 1 : 0) != 0, (String)"Must allow at least 1 connection per endpoint; %s specified", (Object[])new Object[]{this.maxConnectionsPerEndpoint});
    }

    public TrafficMonitor<InetSocketAddress> getMonitor() {
        return this.monitor;
    }

    public Thrift<T> build(Set<InetSocketAddress> backends) {
        this.checkBaseState();
        MorePreconditions.checkNotBlank(backends);
        ManagedThreadPool managedThreadPool = this.createManagedThreadpool(backends.size());
        LoadBalancer<InetSocketAddress> loadBalancer = this.createLoadBalancer();
        Function<TTransport, T> clientFactory = this.getClientFactory();
        ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = this.createConnectionPool(backends, loadBalancer, (Closure<Collection<InetSocketAddress>>)managedThreadPool, false);
        return new Thrift<T>((ExecutorService)((Object)managedThreadPool), connectionPool, (RequestTracker<InetSocketAddress>)loadBalancer, this.serviceName, this.serviceInterface, clientFactory, false, this.sslTransport);
    }

    public Thrift<T> build(DynamicHostSet<ServiceInstance> hostSet) throws ThriftFactoryException {
        this.checkBaseState();
        Preconditions.checkNotNull(hostSet);
        ManagedThreadPool managedThreadPool = this.createManagedThreadpool(1);
        LoadBalancer<InetSocketAddress> loadBalancer = this.createLoadBalancer();
        Function<TTransport, T> clientFactory = this.getClientFactory();
        ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = this.createConnectionPool(hostSet, loadBalancer, (Closure<Collection<InetSocketAddress>>)managedThreadPool, false);
        return new Thrift<T>((ExecutorService)((Object)managedThreadPool), connectionPool, (RequestTracker<InetSocketAddress>)loadBalancer, this.serviceName, this.serviceInterface, clientFactory, false, this.sslTransport);
    }

    private ManagedThreadPool createManagedThreadpool(int initialEndpointCount) {
        return new ManagedThreadPool(this.serviceName, initialEndpointCount, this.maxConnectionsPerEndpoint);
    }

    public Thrift<T> buildAsync(Set<InetSocketAddress> backends) throws ThriftFactoryException {
        this.checkBaseState();
        MorePreconditions.checkNotBlank(backends);
        LoadBalancer<InetSocketAddress> loadBalancer = this.createLoadBalancer();
        Closure noop = Closures.noop();
        Function<TTransport, T> asyncClientFactory = this.getAsyncClientFactory();
        ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = this.createConnectionPool(backends, loadBalancer, (Closure<Collection<InetSocketAddress>>)noop, true);
        return new Thrift<T>(connectionPool, (RequestTracker<InetSocketAddress>)loadBalancer, this.serviceName, this.serviceInterface, asyncClientFactory, true);
    }

    public Thrift<T> buildAsync(DynamicHostSet<ServiceInstance> hostSet) throws ThriftFactoryException {
        this.checkBaseState();
        Preconditions.checkNotNull(hostSet);
        LoadBalancer<InetSocketAddress> loadBalancer = this.createLoadBalancer();
        Closure noop = Closures.noop();
        Function<TTransport, T> asyncClientFactory = this.getAsyncClientFactory();
        ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool = this.createConnectionPool(hostSet, loadBalancer, (Closure<Collection<InetSocketAddress>>)noop, true);
        return new Thrift<T>(connectionPool, (RequestTracker<InetSocketAddress>)loadBalancer, this.serviceName, this.serviceInterface, asyncClientFactory, true);
    }

    private Function<TTransport, T> getClientFactory() {
        return this.clientFactory == null ? ThriftFactory.createClientFactory(this.serviceInterface) : this.clientFactory;
    }

    private Function<TTransport, T> getAsyncClientFactory() throws ThriftFactoryException {
        try {
            return this.clientFactory == null ? this.createAsyncClientFactory(this.serviceInterface) : this.clientFactory;
        }
        catch (IOException e) {
            throw new ThriftFactoryException("Failed to create async client factory.", e);
        }
    }

    private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(Set<InetSocketAddress> backends, LoadBalancer<InetSocketAddress> loadBalancer, Closure<Collection<InetSocketAddress>> onBackendsChosen, boolean nonblocking) {
        ImmutableMap.Builder backendBuilder = ImmutableMap.builder();
        for (InetSocketAddress backend : backends) {
            backendBuilder.put((Object)backend, this.createConnectionPool(backend, nonblocking));
        }
        return new MetaPool(backendBuilder.build(), loadBalancer, onBackendsChosen, this.connectionRestoreInterval);
    }

    private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(DynamicHostSet<ServiceInstance> hostSet, LoadBalancer<InetSocketAddress> loadBalancer, Closure<Collection<InetSocketAddress>> onBackendsChosen, final boolean nonblocking) throws ThriftFactoryException {
        Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>> endpointPoolFactory = new Function<InetSocketAddress, ObjectPool<Connection<TTransport, InetSocketAddress>>>(){

            public ObjectPool<Connection<TTransport, InetSocketAddress>> apply(InetSocketAddress endpoint) {
                return ThriftFactory.this.createConnectionPool(endpoint, nonblocking);
            }
        };
        try {
            return new DynamicPool(hostSet, (Function)endpointPoolFactory, loadBalancer, onBackendsChosen, this.connectionRestoreInterval, Util.GET_ADDRESS, Util.IS_ALIVE);
        }
        catch (DynamicHostSet.MonitorException e) {
            throw new ThriftFactoryException("Failed to monitor host set.", e);
        }
    }

    private ObjectPool<Connection<TTransport, InetSocketAddress>> createConnectionPool(InetSocketAddress backend, boolean nonblocking) {
        ThriftConnectionFactory connectionFactory = new ThriftConnectionFactory(backend, this.maxConnectionsPerEndpoint, ThriftConnectionFactory.TransportType.get(this.framedTransport, nonblocking), this.socketTimeout, this.postCreateCallback, this.sslTransport);
        return new ConnectionPool((ConnectionFactory)connectionFactory, this.statsProvider);
    }

    @VisibleForTesting
    public ThriftFactory<T> withClientFactory(Function<TTransport, T> clientFactory) {
        this.clientFactory = (Function)Preconditions.checkNotNull(clientFactory);
        return this;
    }

    public ThriftFactory<T> withSslEnabled() {
        this.sslTransport = true;
        return this;
    }

    public ThriftFactory<T> withMaxConnectionsPerEndpoint(int maxConnectionsPerEndpoint) {
        Preconditions.checkArgument((maxConnectionsPerEndpoint > 0 ? 1 : 0) != 0);
        this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint;
        return this;
    }

    public ThriftFactory<T> withDeadConnectionRestoreInterval(Amount<Long, Time> connectionRestoreInterval) {
        Preconditions.checkNotNull(connectionRestoreInterval);
        Preconditions.checkArgument(((Long)connectionRestoreInterval.getValue() >= 0L ? 1 : 0) != 0, (String)"A negative interval is invalid: %s", (Object[])new Object[]{connectionRestoreInterval});
        this.connectionRestoreInterval = connectionRestoreInterval;
        return this;
    }

    public ThriftFactory<T> useFramedTransport(boolean framedTransport) {
        this.framedTransport = framedTransport;
        return this;
    }

    public ThriftFactory<T> withLoadBalancingStrategy(LoadBalancingStrategy<InetSocketAddress> strategy) {
        this.loadBalancingStrategy = (LoadBalancingStrategy)Preconditions.checkNotNull(strategy);
        return this;
    }

    private LoadBalancer<InetSocketAddress> createLoadBalancer() {
        if (this.loadBalancingStrategy == null) {
            this.loadBalancingStrategy = this.createDefaultLoadBalancingStrategy();
        }
        return LoadBalancerImpl.create((LoadBalancingStrategy)TrafficMonitorAdapter.create(this.loadBalancingStrategy, this.monitor));
    }

    private LoadBalancingStrategy<InetSocketAddress> createDefaultLoadBalancingStrategy() {
        Function<InetSocketAddress, BackoffDecider> backoffFactory = new Function<InetSocketAddress, BackoffDecider>(){

            public BackoffDecider apply(InetSocketAddress socket) {
                TruncatedBinaryBackoff backoffStrategy = new TruncatedBinaryBackoff(Amount.of((long)2L, (Unit)Time.SECONDS), Amount.of((long)10L, (Unit)Time.SECONDS));
                return BackoffDecider.builder((String)socket.toString()).withTolerateFailureRate(0.2).withRequestWindow(Amount.of((long)1L, (Unit)Time.SECONDS)).withSeedSize(5).withStrategy((BackoffStrategy)backoffStrategy).withRecoveryType(BackoffDecider.RecoveryType.FULL_CAPACITY).withStatsProvider(ThriftFactory.this.statsProvider).build();
            }
        };
        return new MarkDeadStrategyWithHostCheck((LoadBalancingStrategy)new LeastConnectedStrategy(), (Function)backoffFactory);
    }

    public ThriftFactory<T> withSocketTimeout(Amount<Long, Time> socketTimeout) {
        this.socketTimeout = (Amount)Preconditions.checkNotNull(socketTimeout);
        Preconditions.checkArgument(((Long)socketTimeout.as((Unit)Time.MILLISECONDS) >= 0L ? 1 : 0) != 0);
        return this;
    }

    public ThriftFactory<T> withPostCreateCallback(Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback) {
        this.postCreateCallback = (Closure)Preconditions.checkNotNull(postCreateCallback);
        return this;
    }

    public ThriftFactory<T> withStatsProvider(StatsProvider statsProvider) {
        this.statsProvider = (StatsProvider)Preconditions.checkNotNull((Object)statsProvider);
        return this;
    }

    public ThriftFactory<T> withServiceName(String serviceName) {
        this.serviceName = MorePreconditions.checkNotBlank((String)serviceName);
        return this;
    }

    private static <T> Function<TTransport, T> createClientFactory(Class<T> serviceInterface) {
        final Constructor<T> implementationConstructor = ThriftFactory.findImplementationConstructor(serviceInterface);
        return new Function<TTransport, T>(){

            public T apply(TTransport transport) {
                try {
                    return implementationConstructor.newInstance(new TBinaryProtocol(transport));
                }
                catch (InstantiationException e) {
                    throw new RuntimeException(e);
                }
                catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
                catch (InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private <T> Function<TTransport, T> createAsyncClientFactory(Class<T> serviceInterface) throws IOException {
        final TAsyncClientManager clientManager = new TAsyncClientManager();
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                clientManager.stop();
            }
        });
        final Constructor<T> implementationConstructor = ThriftFactory.findAsyncImplementationConstructor(serviceInterface);
        return new Function<TTransport, T>(){

            public T apply(TTransport transport) {
                Preconditions.checkNotNull((Object)transport);
                Preconditions.checkArgument((boolean)(transport instanceof TNonblockingTransport), (Object)("Invalid transport provided to client factory: " + transport.getClass()));
                try {
                    Object client = implementationConstructor.newInstance(new TBinaryProtocol.Factory(), clientManager, transport);
                    if (ThriftFactory.this.socketTimeout != null) {
                        ((TAsyncClient)client).setTimeout(((Long)ThriftFactory.this.socketTimeout.as((Unit)Time.MILLISECONDS)).longValue());
                    }
                    return client;
                }
                catch (InstantiationException e) {
                    throw new RuntimeException(e);
                }
                catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
                catch (InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static <T> Constructor<? extends T> findImplementationConstructor(Class<T> serviceInterface) {
        Class<T> implementationClass = ThriftFactory.findImplementationClass(serviceInterface);
        try {
            return implementationClass.getConstructor(TProtocol.class);
        }
        catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Failed to find a single argument TProtocol constructor in service client class: " + implementationClass);
        }
    }

    private static <T> Constructor<? extends T> findAsyncImplementationConstructor(Class<T> serviceInterface) {
        Class<T> implementationClass = ThriftFactory.findImplementationClass(serviceInterface);
        try {
            return implementationClass.getConstructor(TProtocolFactory.class, TAsyncClientManager.class, TNonblockingTransport.class);
        }
        catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Failed to find expected constructor in service client class: " + implementationClass);
        }
    }

    private static <T> Class<? extends T> findImplementationClass(final Class<T> serviceInterface) {
        try {
            return (Class)Iterables.find((Iterable)ImmutableList.copyOf((Object[])serviceInterface.getEnclosingClass().getClasses()), (Predicate)new Predicate<Class<?>>(){

                public boolean apply(Class<?> inner) {
                    return !serviceInterface.equals(inner) && serviceInterface.isAssignableFrom(inner);
                }
            });
        }
        catch (NoSuchElementException e) {
            throw new IllegalArgumentException("Could not find a sibling enclosed implementation of service interface: " + serviceInterface);
        }
    }

    public static class ThriftFactoryException
    extends Exception {
        public ThriftFactoryException(String msg) {
            super(msg);
        }

        public ThriftFactoryException(String msg, Throwable t) {
            super(msg, t);
        }
    }

    private static class ManagedThreadPool
    extends ForwardingExecutorService<ThreadPoolExecutor>
    implements Closure<Collection<InetSocketAddress>> {
        private static final Logger LOG = Logger.getLogger(ManagedThreadPool.class.getName());
        private final int maxConnectionsPerEndpoint;

        private static ThreadPoolExecutor createThreadPool(String serviceName, int initialSize) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Thrift[" + serviceName + "][%d]").setDaemon(true).build();
            return new ThreadPoolExecutor(initialSize, initialSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), threadFactory);
        }

        public ManagedThreadPool(String serviceName, int initialEndpointCount, int maxConnectionsPerEndpoint) {
            super((ExecutorService)ManagedThreadPool.createThreadPool(serviceName, initialEndpointCount * maxConnectionsPerEndpoint));
            this.maxConnectionsPerEndpoint = maxConnectionsPerEndpoint;
            this.setRejectedExecutionHandler(initialEndpointCount);
        }

        private void setRejectedExecutionHandler(int endpointCount) {
            final String message = String.format("All %d x %d connections in use", endpointCount, this.maxConnectionsPerEndpoint);
            ((ThreadPoolExecutor)this.delegate).setRejectedExecutionHandler(new RejectedExecutionHandler(){

                @Override
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
                    throw new RejectedExecutionException(message);
                }
            });
        }

        public void execute(Collection<InetSocketAddress> chosenBackends) {
            int backendCount;
            int newPoolSize;
            int previousPoolSize = ((ThreadPoolExecutor)this.delegate).getMaximumPoolSize();
            if (previousPoolSize != (newPoolSize = (backendCount = Math.max(chosenBackends.size(), 1)) * this.maxConnectionsPerEndpoint)) {
                LOG.info(String.format("Re-sizing deadline thread pool from: %d to: %d", previousPoolSize, newPoolSize));
                if (previousPoolSize < newPoolSize) {
                    ((ThreadPoolExecutor)this.delegate).setMaximumPoolSize(newPoolSize);
                    ((ThreadPoolExecutor)this.delegate).setCorePoolSize(newPoolSize);
                } else {
                    ((ThreadPoolExecutor)this.delegate).setCorePoolSize(newPoolSize);
                    ((ThreadPoolExecutor)this.delegate).setMaximumPoolSize(newPoolSize);
                }
                this.setRejectedExecutionHandler(backendCount);
            }
        }
    }
}

