用Netty开发中间件:高并发性能优化

最近在写一个后台中间件的原型,主要是做消息的分发和透传。因为要用Java实现,所以网络通信框架的第一选择当然就是Netty了,使用的是Netty 4版本。Netty果然效率很高,不用做太多努力就能达到一个比较高的tps。但使用过程中也碰到了一些问题,个人觉得都是比较经典而在网上又不太容易查找到相关资料的问题,所以在此总结一下。

1.Context Switch过高

压测时用nmon监控内核,发现Context Switch高达30w+。这明显不正常,但JVM能有什么导致Context Switch。参考之前整理过的恐龙书《Operating System Concept》的读书笔记《进程调度》和Wiki上的Context Switch介绍,进程/线程发生上下文切换的原因有:

  • I/O等待:在多任务系统中,进程主动发起I/O请求,但I/O设备还没有准备好,所以会发生I/O阻塞,进程进入Wait状态。
  • 时间片耗尽:在多任务分时系统中,内核分配给进程的时间片已经耗尽了,进程进入Ready状态,等待内核重新分配时间片后的执行机会。
  • 硬件中断:在抢占式的多任务分时系统中,I/O设备可以在任意时刻发生中断,CPU会停下当前正在执行的进程去处理中断,因此进程进入Ready状态。

根据分析,重点就放在第一个和第二个因素上。

1.1 非阻塞I/O

针对第一个因素I/O等待,最直接的解决办法就是使用非阻塞I/O操作。在Netty中,就是服务端和客户端都使用NIO。

这里在说一下如何主动的向Netty的Channel写入数据,因为网络上搜到的资料都是千篇一律:服务端就是接到请求后在Handler中写入返回数据,而客户端的例子竟然也都是在Handler里Channel Active之后发送数据。因为要做消息透传,而且是向下游系统发消息时是异步非阻塞的,网上那种例子根本没法用,所以在这里说一下我的方法吧。

关于服务端,在接收到请求后,在channelRead0()中通过ctx.channel()得到Channel,然后就通过ThreadLocal变量或其他方法,只要能把这个Channel保存住就行。当需要返回响应数据时就主动向持有的Channel写数据。具体请参照后面第4节。

关于客户端也是同理,在启动客户端之后要拿到Channel,当要主动发送数据时就向Channel中写入。

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(...);
}
}); try {
ChannelFuture future = b.connect().sync();
this.channel = future.channel();
}
catch (InterruptedException e) {
throw new IllegalStateException("Error when start netty client: addr=[" + addr + "]", e);
}

1.2 减少线程数

线程太多的话每个线程得到的时间片就少,CPU要让各个线程都有机会执行就要切换,切换就要不断保存和还原线程的上下文现场。于是检查Netty的I/O worker的EventLoopGroup。之前在《Netty 4源码解析:服务端启动》中曾经分析过,EventLoopGroup默认的线程数是CPU核数的二倍。所以手动配置NioEventLoopGroup的线程数,减少一些I/O线程。

private void doStartNettyServer(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(...);
}
}); // Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

此外因为还用了Akka作为业务线程池,所以还看了下如何修改Akka的默认配置。方法是新建一个叫做application.conf的配置文件,我们创建ActorSystem时会自动加载这个配置文件,下面的配置文件中定制了一个dispatcher:

my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 16
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

简单来说,最关键的几个配置项是:

  • parallelism-factor:决定线程池的大小(竟然不是parallelism-max)。
  • throughput:决定coroutine的切换频率,1是最为频繁也最为公平的设置。

因为本篇主要是介绍Netty的,所以具体含义就详细介绍了,请参考官方文档中对Dispatcher和Mailbox的介绍。创建特定Dispatcher的Akka很简单,以下是创建类型化Actor时指定Dispatcher的方法。

TypedActor.get(system).typedActorOf(
new TypedProps<MyActorImpl>(
MyActor.class,
new Creator<MyActorImpl>() {
@Override
public MyActorImpl create() throws Exception {
return new MyActorImpl(XXX);
}
}
).withDispatcher("my-dispatcher")
);

1.3 去业务线程池

尽管上面做了种种改进配置,用jstack查看线程配置确实生效了,但Context Switch的状况并没有好转。于是干脆去掉Akka实现的业务线程池,彻底减少线程上下文的切换。发现CS从30w+一下子降到了16w!费了好大力气在万能的StackOverflow上查到了一篇文章,其中一句话点醒了我:

有了线索就赶紧去查Netty源码,发现的确像调用channel.write()操作不是在当前线程上执行。Netty内部统一使用executor.inEventLoop()判断当前线程是否是EventLoopGroup的线程,否则会包装好Task交给内部线程池执行:

private void write(Object msg, boolean flush, ChannelPromise promise) {

        AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}

业务线程池原来是把双刃剑。虽然将任务交给业务线程池异步执行降低了Netty的I/O线程的占用时间、减轻了压力,但同时业务线程池增加了线程上下文切换的次数。通过上述这些优化手段,终于将压测时的CS从每秒30w+降到了8w左右,效果还是挺明显的!


2.系统调用开销

系统调用一般会涉及到从User Space到Kernel Space的模态转换(Mode Transition或Mode Switch)。这种转换也是有一定开销的。

Netty涉及的系统调用最多的就是网络通信操作了,所以为了降低系统调用的频度,最直接的方法就是缓冲输出内容,达到一定的数据大小、写入次数或时间间隔时才flush缓冲区

对于缓冲区大小不足,写入速度过快等问题,Netty提供了writeBufferLowWaterMark和writeBufferHighWaterMark选项,当缓冲区达到一定大小时则不能写入,避免被撑爆。感觉跟Netty提供的Traffic Shaping流量整形功能有点像呢。具体还未深入研究,感兴趣的同学可以自行学习一下。


3.Zero Copy实现

《Netty权威指南(第二版)》中专门有一节介绍Netty的Zero Copy,但针对的是Netty内部的零拷贝功能。我们这里想谈的是如何在应用代码中实现Zero Copy,最典型的应用场景就是消息透传。因为透传不需要完整解析消息,只需要知道消息要转发给下游哪个系统就足够了。所以透传时,我们可以只解析出部分消息,消息整体还原封不动地放在Direct Buffer里,最后直接将它写入到连接下游系统的Channel中。所以应用层的Zero Copy实现就分为两部分:Direct Buffer配置和Buffer的零拷贝传递。

3.1 内存池

使用Netty带来的又一个好处就是内存管理。只需一行简单的配置,就能获得到内存池带来的好处。在底层,Netty实现了一个Java版的Jemalloc内存管理库(还记得Redis自带的那个吗),为我们做完了所有“脏活累活”!

ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(...);
}
});

3.2 应用层的Zero Copy

默认情况下,Netty会自动释放ByteBuf。也就是说当我们覆写的channelRead0()返回时,ByteBuf就结束了它的使命,被Netty自动释放掉(如果是池化的就可会被放回到内存池中)。

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

    @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
}

因为Netty是用引用计数的方式来判断是否回收的,所以要想继续使用ByteBuf而不让Netty释放的话,就要增加它的引用计数。只要我们在ChannelPipeline中的任意一个Handler中调用ByteBuf.retain()将引用计数加1,Netty就不会释放掉它了。我们在连接下游的客户端的Encoder中发送消息成功后再释放掉,这样就达到了零拷贝透传的效果:

public class RespEncoder extends MessageToByteEncoder<Resp> {

    @Override
protected void encode(ChannelHandlerContext ctx, Msg msg, ByteBuf out) throws Exception {
// Raw in Msg is retained ByteBuf
out.writeBytes(msg.getRaw(), 0, msg.getRaw().readerIndex());
msg.getRaw().release();
} }

4.并发下的状态处理

前面第1.1节介绍的异步写入持有的Channel和第2节介绍的根据一定规则flush缓冲区等等,都涉及到状态的保存。如果要并发访问这些状态的话,就要提防并发的race condition问题,避免更新冲突、丢失等等。

4.1 Channel保存

在Netty服务端的Handler里如何持有Channel呢?我是这样做的,在channelActive()或第一次进入channelRead0()时创建一个Session对象持有Channel。因为之前在《Netty 4源码解析:请求处理》中曾经分析过Netty 4的线程模型:多个客户端可能会对应一个EventLoop线程,但对于一个客户端来说只能对应一个EventLoop线程。每个客户端都对应自己的Handler实例,并且一直使用到连接断开

public class FrontendHandler extends SimpleChannelInboundHandler<Msg> {

    private Session session;

    @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
session = factory.createSession(ctx.channel());
super.channelActive(ctx);
} @Override
protected void channelRead0(final ChannelHandlerContext ctx, Msg msg) throws Exception {
session.handleRequest(msg);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
session = null;
super.channelInactive(ctx);
} }

4.2 Decoder状态

因为网络粘包拆包等因素,Decoder不可避免的要保存一些解析过程的中间状态。因为Netty对于每个客户端的生命周期内会一直使用同一个Decoder实例,所以解析完成后一定要重置中间状态,避免后续解析错误。

public class RespDecoder extends ReplayingDecoder {

    public MsgDecoder() {
doCleanUp();
} @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
if (doParseMsg(in)) {
doSendToHandler(out);
doCleanUp();
}
}
}

5.总结

5.1 多变的Netty

总结之前先吐槽一下,令人又爱又恨的Netty更新速度。从Netty 3到Netty 4,API发生了一次“大地震”,好多网上的示例程序都是基于Netty 3,所以学习Netty 4时发现好多例子都跑不起来了。除了API,Netty内部的线程模型等等变化就更不用说了。本以为用上了Netty 4就可以安心了,结果Netty 5的线程模型又-变-了!看看官方文档里的说法吧,升级的话又要注意了。

根据官方文档的说法,Netty不再保证特定的Handler实例在运行时一定对应一个线程,所以,在Handler中用ThreadLocal的话就是比较危险的写法了!

5.2 高并发编程技巧

经过上面的种种琢磨和努力,tps终于从几千达到了5w左右,学到了很多之前不懂的网络编程和性能优化的知识,还是很有成就感的!总结一下,高并发中间件的优化策略有:

  • 线程数控制:高并发下如果线程较多时,Context Switch会非常明显,超过CPU核心数的线程不会带来任何好处。不是特别耗时的操作的话,业务线程池也是有害无益的。Netty 5为我们提供了指定底层线程池的机会,这样能更好的控制整个中间件的线程数和调度策略。
  • 非阻塞I/O操作:要想线程少还多做事,避免阻塞是一定要做的。
  • 减少系统调用:虽然Mode Switch比Context Switch的开销要小得多,但我们还是要尽量减少频繁的syscall。
  • 数据零拷贝:从内核空间的Direct Buffer拷贝到用户空间,每次透传都拷贝的话累积起来是个不小的开销。
  • 共享状态保护:中间件内部的并发处理也是决定性能的关键。
04-16 05:37