/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.mqtt.network.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.PipelineFactory;
import io.confluent.mqtt.ProxyFactory;
import io.confluent.mqtt.ProxyServer;
import io.confluent.mqtt.network.netty.NettyProxyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import java.util.concurrent.ThreadFactory;

public class NettyProxyFactory
implements ProxyFactory {
    private final MqttConfig config;

    public NettyProxyFactory(MqttConfig config) {
        this.config = config;
    }

    @Override
    public ProxyServer newProxy(final PipelineFactory pipelineFactory) {
        MultithreadEventLoopGroup networkGroupPool = this.newEventLoopGroup(this.config.networkThreads(), "kafka-mqtt-network-thread", this.config.epollEnabled());
        final MultithreadEventLoopGroup producerGroupPool = this.newEventLoopGroup(this.config.streamThreads(), "kafka-mqtt-stream-thread", this.config.epollEnabled());
        ServerBootstrap bootstrap = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)bootstrap.group((EventLoopGroup)networkGroupPool).channel(this.channelClass(this.config.epollEnabled()))).localAddress(this.config.host(), this.config.port())).option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT)).childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(pipelineFactory.newPublishPipelineHandlers(channel)).addLast((EventExecutorGroup)producerGroupPool, new ChannelHandler[]{pipelineFactory.newKafkaPublishHandler(channel)});
            }
        });
        return new NettyProxyServer(bootstrap);
    }

    private MultithreadEventLoopGroup newEventLoopGroup(int threadNum, String threadPrefix, boolean hasEpoll) {
        DefaultThreadFactory threadFactory = new DefaultThreadFactory(threadPrefix, true);
        return hasEpoll ? new EpollEventLoopGroup(threadNum, (ThreadFactory)threadFactory) : new NioEventLoopGroup(threadNum, (ThreadFactory)threadFactory);
    }

    private Class<? extends ServerChannel> channelClass(boolean hasEpoll) {
        return hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
    }
}

