目录

Netty ByteBuf(图解 )之一

疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之15 【 博客园 总入口


源码工程

源码IDEA工程获取链接Java 聊天室 实战 源码

写在前面

​ 大家好,我是作者尼恩。

​ 今天是百万级流量 Netty 聊天器 打造的系列文章的第15篇,这是一个基础篇。

​ 由于关于ByteBuf的内容比较多,分两篇文章:

​ 第一篇:图解 ByteBuf的分配、释放和如何避免内存泄露

​ 第二篇:图解 ByteBuf的具体使用

本篇为第一篇

Netty ByteBuf 优势

Netty 提供了ByteBuf,来替代Java NIO的 ByteBuffer 缓,来操纵内存缓冲区。

与Java NIO的 ByteBuffer 相比,ByteBuf的优势如下:

  1. Pooling (池化,这点减少了内存复制和GC,提升效率)

  2. 可以自定义缓冲类型

  3. 通过一个内置的复合缓冲类型实现零拷贝

  4. 扩展性好,比如 StringBuffer

  5. 不需要调用 flip()来切换读/写模式

  6. 读取和写入索引分开

  7. 方法链

  8. 引用计数

手动获取与释放ByteBuf

Netty环境下,业务处理的代码,基本上都在Handler处理器中的各个入站和出站方法中。

一般情况下,采用如下方法获取一个Java 堆中的缓冲区:

使用完成后,通过如下的方法,释放缓冲区:

上面的代码很简单,通过release方法减去 heapBuffer 的使用计数,Netty 会自动回收 heapBuffer 。

缓冲区内存的回收、二次分配等管理工作,是 Netty 自动完成的。

自动获取和释放 ByteBuf

方式一:TailHandler 自动释放

​ Netty默认会在ChannelPipline的最后添加的那个 TailHandler 帮你完成 ByteBuf的release。

​ 先看看,自动创建的ByteBuf实例是如何登场的?

Netty自动创建 ByteBuf实例

​ Netty 的 Reactor 线程会在 AbstractNioByteChannel.NioByteUnsafe.read() 处调用 ByteBufAllocator创建ByteBuf实例,将TCP缓冲区的数据读取到 Bytebuf 实例中,并调用 pipeline.fireChannelRead(byteBuf) 进入pipeline 入站处理流水线。

默认情况下,TailHandler自动释放掉ByteBuf实例

​ Netty的ChannelPipleline的流水线的末端是TailHandler,默认情况下如果每个入站处理器Handler都把消息往下传,TailHandler会释放掉ReferenceCounted类型的消息。
ByteBuf(图解)-LMLPHP

如果没有到达末端呢?

​ 一种没有到达入站处理流水线pipeline末端的情况,如下图所示:
ByteBuf(图解)-LMLPHP

​ 这种场景下,也有一种自动释放的解决办法,它就是:

​ 可以继承 SimpleChannelInboundHandler,实现业务Handler。 SimpleChannelInboundHandler 会完成ByteBuf 的自动释放,释放的处理工作,在其入站处理方法 channelRead 中。

方式二:SimpleChannelInboundHandler 自动释放

​ 如果业务Handler需要将 ChannelPipleline的流水线的默认处理流程截断,不进行后边的inbound入站处理操作,这时候末端 TailHandler自动释放缓冲区的工作,自然就失效了。

​ 这种场景下,业务Handler 有两种选择:

  • 手动释放 ByteBuf 实例

  • 继承 SimpleChannelInboundHandler,利用它的自动释放功能。

本小节,我们聚焦的是第二种选择:看看 SimpleChannelInboundHandler是如何自动释放的。

利用这种方法,业务处理Handler 必须继承 SimpleChannelInboundHandler基类。并且,业务处理的代码,必须 移动到 重写的 channelRead0(ctx, msg)方法中。

如果好奇,想看看 SimpleChannelInboundHandler 是如何释放ByteBuf 的,那就一起来看看Netty源码。

截取的代码如下所示:

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter 
{
//...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            channelRead0(ctx, imsg);
        } else {
            release = false;
            ctx.fireChannelRead(msg);
        }
    } finally {
        if (autoRelease && release) {
            ReferenceCountUtil.release(msg);
        }
    }
}

源码中,执行完重写的channelRead0()后,在 finally 语句块中,ByteBuf 的生命被结束掉了。

方式三:HeadHandler 自动释放

出站处理流程中,申请分配到的 ByteBuf,通过 HeadHandler 完成自动释放。

出站处理用到的 Bytebuf 缓冲区,一般是要发送的消息,通常由应用所申请。在出站流程开始的时候,通过调用 ctx.writeAndFlush(msg),Bytebuf 缓冲区开始进入出站处理的 pipeline 流水线 。在每一个出站Handler中的处理完成后,最后消息会来到出站的最后一棒 HeadHandler,再经过一轮复杂的调用,在flush完成后终将被release掉。

ByteBuf(图解)-LMLPHP

如何避免内存泄露

​ 基本上,在 Netty的开发中,通过 ChannelHandlerContext 或 Channel 获取的缓冲区ByteBuf 默认都是Pooled,所以需要再合适的时机对其进行释放,避免造成内存泄漏。

自动释放的注意事项

  • 通过 TailHandler 自动释放入站 ByteBuf

  • 继承 SimpleChannelInboundHandler 的完成 入站ByteBuf 自动释放

  • 通过HeadHandler自动释放出站 ByteBuf

    自动释放,注意事项如下

  • 入站处理流程中,如果对原消息不做处理,默认会调用 ctx.fireChannelRead(msg) 把原消息往下传,由流水线最后一棒 TailHandler 完成自动释放。

  • 如果截断了入站处理流水线,则可以继承 SimpleChannelInboundHandler ,完成入站ByteBuf 自动释放。

  • 出站处理过程中,申请分配到的 ByteBuf,通过 HeadHandler 完成自动释放。

    出站处理用到的 Bytebuf 缓冲区,一般是要发送的消息,通常由应用所申请。在出站流程开始的时候,通过调用 ctx.writeAndFlush(msg),Bytebuf 缓冲区开始进入出站处理的 pipeline 流水线 。在每一个出站Handler中的处理完成后,最后消息会来到出站的最后一棒 HeadHandler,再经过一轮复杂的调用,在flush完成后终将被release掉。

手动释放的注意事项

​ 手动释放是自动释放的重要补充和辅助。

手动释放操作,大致有如下注意事项

  • 入站处理中,如果将原消息转化为新的消息并调用 ctx.fireChannelRead(newMsg)往下传,那必须把原消息release掉;

  • 入站处理中,如果已经不再调用 ctx.fireChannelRead(msg) 传递任何消息,也没有继承SimpleChannelInboundHandler 完成自动释放,那更要把原消息release掉;

  • 多层的异常处理机制,有些异常处理的地方不一定准确知道ByteBuf之前释放了没有,可以在释放前加上引用计数大于0的判断避免异常; 有时候不清楚ByteBuf被引用了多少次,但又必须在此进行彻底的释放,可以循环调用reelase()直到返回true。

总之,只要是在传递过程中,没有传递下去的ByteBuf就需要手动释放,避免不必要的内存泄露

缓冲区 Allocator 分配器

Netty通过 ByteBufAllocator分配缓冲区。

分配器 Allocator的类型

ByteBuf(图解)-LMLPHP

PooledByteBufAllocator:可以重复利用之前分配的内存空间。

为了减少内存的分配回收以及产生的内存碎片,Netty提供了PooledByteBufAllocator 用来分配可回收的ByteBuf,可以把PooledByteBufAllocator 看做一个池子,需要的时候从里面获取ByteBuf,用完了放回去,以此提高性能。

UnpooledByteBufAllocator:不可重复利用,由JVM GC负责回收

顾名思义Unpooled就是不会放到池子里,所以根据该分配器分配的ByteBuf,不需要放回池子,由JVM自己GC回收。

这两个类,都是AbstractByteBufAllocator的子类,AbstractByteBufAllocator实现了一个接口,叫做ByteBufAllocator。

默认的分配器

​ 默认的分配器 ByteBufAllocator.DEFAULT ,可以通过 Java 系统参数(SystemProperty )选项 io.netty.allocator.type 去配置,使用字符串值:"unpooled","pooled"。

​ 关于这一段,Netty的源代码截取如下:

        String allocType = SystemPropertyUtil.get("io.netty.allocator.type", "unpooled").toLowerCase(Locale.US).trim();
        Object alloc;
        if("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: unpooled (unknown: {})", allocType);
        }
不同的Netty版本,源码不一样。

​ 上面的代码,是4.0版本的源码,默认为UnpooledByteBufAllocator。

​ 而4.1 版本,默认为 PooledByteBufAllocator。因此,4.1版本的代码,是和上面的代码稍微有些不同的。

设置通道Channel的分配器

在4.x版本中,UnpooledByteBufAllocator是默认的allocator,尽管其存在某些限制。

现在PooledByteBufAllocator已经广泛使用一段时间,并且我们有了增强的缓冲区泄漏追踪机制,所以是时候让PooledByteBufAllocator成为默认了。

ServerBootstrap b = new ServerBootstrap()
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .localAddress(port)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(...);
            }
        });

使用Netty带来的又一个好处就是内存管理。只需一行简单的配置,就能获得到内存池带来的好处。在底层,Netty实现了一个Java版的Jemalloc内存管理库,为我们做完了所有“脏活累活”!

缓冲区内存的类型

​ 说完了分配器的类型,再来说下缓冲区的类型。

​ 依据内存的管理方不同,分为堆缓存和直接缓存。也就是Heap ByteBuf 和 Direct ByteBuf。另外,为了方便缓冲区进行组合,提供了一种组合缓存区。

ByteBuf(图解)-LMLPHP

​ 三种缓冲区的介绍如下:

​ 上面三种缓冲区的类型,无论哪一种,都可以通过池化、非池化的方式,去获取。

Unpooled 非池化缓冲区的使用方法

Unpooled也是用来创建缓冲区的工具类,Unpooled 的使用也很容易。

看下面代码:

Unpooled 提供了很多方法,详细方法大致如下:

写在最后

​ 至此为止,终于完成ByteBuf的分配、释放和如何避免内存泄露介绍。

​ 接下来是:

​ 第二篇:图解 ByteBuf的具体使用


疯狂创客圈 Java 死磕系列

  • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

  • Netty 源码、原理、JAVA NIO 原理
  • Java 面试题 一网打尽
  • 疯狂创客圈 【 博客园 总入口 】


11-19 08:53