一、Netty 入门基础

1.1、基础概念

Netty是一个 NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现是很复杂的,但是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是完全基于NIO实现的,所以整个Netty都是异步的

网络应用程序通常需要有较高的可扩展性,无论是Netty还是其他的基于JavaNIO的框架,都会提供可扩展性的解决方案。Netty中 一个关键组成部分是它的异步特性

Netty是最流行的NIO框架, 他的健壮性、功能、性能、可定制性和可扩展性在同类框架都是首屈一指的。 它已经得到成百,上千的商业/商用项目验证,如HadoopRPC框架Avro、 强大的RocketMQ、还有主流的分布式通信框架Dubbox等等


1.2 架构组成

Netty入门基础笔记-LMLPHP


二、入门案例

2.1 实现步骤

Netty实现通信的步骤:

  • 创建两个的NIO线程组,-一个专门用于网络事件处理(接受客户端的连接),另一-个则进行网络通信读写。
  • 创建一个ServerBootstrap对象, 配置Netty的一系列参数, 例如接受传出数据的缓存大小等等。
  • 创建一个实际处理数据的类Channellnitializer,进行初始化的准备工作,比如设置接受传出数据的字符集、格式、已经实际处理数据的接口。
  • 绑定端口,执行同步阻塞方法等待服务器端启动即可。

解读Netty示例: http://ifeve.com/netty5-user-guide/


2.2 入门案例

  • maven依赖
 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>5.0.0.Alpha2</version>
 </dependency>
  • 服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {

	public static void main(String[] args) throws Exception {
		//1 创建线两个程组 
		//一个是用于处理服务器端接收客户端连接的
		//一个是进行网络通信的(网络读写的)
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();

		//2 创建辅助工具类,用于服务器通道的一系列配置
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)		//绑定俩个线程组
		.channel(NioServerSocketChannel.class)		//指定NIO的模式
		.option(ChannelOption.SO_BACKLOG, 1024)		//设置tcp缓冲区
		.option(ChannelOption.SO_SNDBUF, 32*1024)	//设置发送缓冲大小
		.option(ChannelOption.SO_RCVBUF, 32*1024)	//这是接收缓冲大小
		.option(ChannelOption.SO_KEEPALIVE, true)	//保持连接
		.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//3 在这里配置具体数据接收方法的处理
				sc.pipeline().addLast(new ServerHandler());
			}
		});

		//4 进行绑定 
		ChannelFuture cf1 = b.bind(8765).sync();
		//5 等待关闭
		cf1.channel().closeFuture().sync();

		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
	}
}

  • 服务端的handler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("server channel active... ");
	}
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
			ByteBuf buf = (ByteBuf) msg;
			byte[] req = new byte[buf.readableBytes()];
			buf.readBytes(req);
			String body = new String(req, "utf-8");
			System.out.println("Server :" + body );
			String response = "进行返回给客户端的响应:" + body ;
			ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
			//.addListener(ChannelFutureListener.CLOSE);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx)
			throws Exception {
		System.out.println("读完了");
		ctx.flush();
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
			throws Exception {
		ctx.close();
	}
}

  • 客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});

		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//发送消息
		Thread.sleep(1000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		Thread.sleep(2000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}

  • 客户端handler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
	}
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {
			ByteBuf buf = (ByteBuf) msg;

			byte[] req = new byte[buf.readableBytes()];
			buf.readBytes(req);

			String body = new String(req, "utf-8");
			System.out.println("Client :" + body );
			String response = "收到服务器端的返回信息:" + body;
		} finally {
			ReferenceCountUtil.release(msg);
		}
	}
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}
}

三、TCP粘包、拆包问题

3.1 概念引入

TCP是一个“流”协议,所谓流就是没有界限的。大家可以想象下如果河里的水就好比数据,他们是连成一片的, 没有分界线,TCP底层并不了解上层的业务数据具体的含义,它会根据TCP缓冲区的实际情况进行包的划分,也就是说,在业务上, 我们一个完整的包可能会被TCP分成多个包进行发送, 也可能把多个小包封装成一个大的数据包发送出去,这就是所谓的TCP粘包、拆包问题。

3.2 问题原因与解决方案:

分析TCP粘包、拆包问题的产生原因:

  1. 应用程序write写入的字节大小大于套接口发送缓冲区的大小
  2. 进行MSS大小的TCP分段
  3. 以太网帧的payload大于MTU进行IP分片

粘包拆包问题的解决方案,常有三种方案:

  1. 消息定长,例如每个报文的大小固定为200个字节,如果不够,空位补空格;
  2. 在包尾部增加特殊字符进行分割,例如加回车等
  3. 讲消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,然后进行业务逻辑的处理

3.3 Netty解决方法

  • 分隔符类DelimiterBasedFrameDecoder (自定义分隔符)
  • FixedLengthFrameDecoder(定长)

3.3.1 分隔符方式

  • 在服务端,修改辅助类
//服务器辅助类
	ServerBootstrap b = new ServerBootstrap();
	b.group(pGroup, cGroup)
	 .channel(NioServerSocketChannel.class)
	 .option(ChannelOption.SO_BACKLOG, 1024)
	 .option(ChannelOption.SO_SNDBUF, 32*1024)
	 .option(ChannelOption.SO_RCVBUF, 32*1024)
	 .childHandler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel sc) throws Exception {
			//设置特殊分隔符
			ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
			sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
			//设置字符串形式的解码
			sc.pipeline().addLast(new StringDecoder());
			sc.pipeline().addLast(new ServerHandler());
		}
	});
  • client端同时修改分隔符
Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<SocketChannel>() {
	@Override
	protected void initChannel(SocketChannel sc) throws Exception {
		ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
		sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
		sc.pipeline().addLast(new StringDecoder());
		sc.pipeline().addLast(new ClientHandler());
	}
});

3.3.2 长连接和短连接

长短连接问题,通过添加监听器,去监听服务端消息是否写完

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
		throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "utf-8");
		System.out.println("Server :" + body );
		String response = "进行返回给客户端的响应:" + body ;
		//当确认服务端确认写完,则断开连接
		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
		.addListener(ChannelFutureListener.CLOSE);
}

3.3.3 服务端多次write,一次flush

public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});

		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//发送消息  
		Thread.sleep(1000);
		//写的缓存区
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		//需要flush。否则不会写出
		cf1.channel().flush();
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}
  • 控制台输出情况:
 Client :进行返回给客户端的响应:hello nettyhello nettyhello netty

3.3.4 停顿发送


public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});

		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//发送消息
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		//停顿,当一个间隔,则会发生一次输出。
		Thread.sleep(2000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}
  • 控制台输出情况
Client :进行返回给客户端的响应:777666
Client :进行返回给客户端的响应:888

3.3.5 绑定多个 端口

  • 服务端
public class Server {

	public static void main(String[] args) throws Exception {
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)		//绑定俩个线程组
		.channel(NioServerSocketChannel.class)		//指定NIO的模式
		.option(ChannelOption.SO_BACKLOG, 1024)		//设置tcp缓冲区
		.option(ChannelOption.SO_SNDBUF, 32*1024)	//设置发送缓冲大小
		.option(ChannelOption.SO_RCVBUF, 32*1024)	//这是接收缓冲大小
		.option(ChannelOption.SO_KEEPALIVE, true)	//保持连接
		.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ServerHandler());
			}
		});

		//绑定多个端口
		ChannelFuture cf1 = b.bind(8765).sync();
		ChannelFuture cf2 = b.bind(8764).sync();
		cf1.channel().closeFuture().sync();
		cf2.channel().closeFuture().sync();
		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
	}
}

  • client代码
public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});

		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
		//发送消息
		Thread.sleep(1000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		Thread.sleep(2000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		cf1.channel().closeFuture().sync();
		cf2.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}


3.4 FixedLengthFrameDecoder (定长)方式

  • 服务端设置定长为5 ; 如果长度没有达到5,则不输出。
public class Server {
	public static void main(String[] args) throws Exception{
		//1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();

		//2 创建服务器辅助类
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)
		 .channel(NioServerSocketChannel.class)
		 .option(ChannelOption.SO_BACKLOG, 1024)
		 .option(ChannelOption.SO_SNDBUF, 32*1024)
		 .option(ChannelOption.SO_RCVBUF, 32*1024)
		 .childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//设置定长字符串接收
				sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
				//设置字符串形式的解码
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ServerHandler());
			}
		});
		//4 绑定连接
		ChannelFuture cf = b.bind(8765).sync();
		//等待服务器监听端口关闭
		cf.channel().closeFuture().sync();
		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
	}
}
  • client 设置长度
public class Client {
	public static void main(String[] args) throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		 .channel(NioSocketChannel.class)
		 .handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//设置定长
				sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
				sc.pipeline().addLast(new StringDecoder());
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
		cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
		cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));
		//等待客户端端口关闭
		cf.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}

参考

详细解读可以参考这篇文章:http://ifeve.com/netty5-user-guide/

10-05 14:00