推送系统

一、系统设计

Java高并发网络编程(五)Netty应用-LMLPHP

Java高并发网络编程(五)Netty应用-LMLPHP

二、拆包和粘包

public class XNettyServer {
public static void main(String[] args) throws Exception {
// 1、 线程定义
// accept 处理连接的线程池
EventLoopGroup acceptGroup = new NioEventLoopGroup();
// read io 处理数据的线程池
EventLoopGroup readGroup = new NioEventLoopGroup(); try {
ServerBootstrap b = new ServerBootstrap();
b.group(acceptGroup, readGroup);
// 2、 选择TCP协议,NIO的实现方式
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override
protected void initChannel(SocketChannel ch) throws Exception {
// 3、 职责链定义(请求收到后怎么处理)
ChannelPipeline pipeline = ch.pipeline();
// TODO 3.1 增加解码器
// pipeline.addLast(new XDecoder());
// TODO 3.2 打印出内容 handdler
pipeline.addLast(new XHandller());
}
});
// 4、 绑定端口
System.out.println("启动成功,端口 9999");
b.bind(9999).sync().channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
readGroup.shutdownGracefully();
}
}
}

也会存在粘包和拆包的问题

Java高并发网络编程(五)Netty应用-LMLPHP

自己编写解析器

简单地用长度做处理

// 编解码一定是根据协议~如http
public class XDecoder extends ByteToMessageDecoder {
static final int PACKET_SIZE = 220; // 每次请求数据大小是220,我们自己定义的协议 // 用来临时保留没有处理过的请求报文,如只传过来了110个字节,先存着
ByteBuf tempMsg = Unpooled.buffer(); // in输入 --- 处理 --- out 输出
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
// in 请求的数据
// out 将粘在一起的报文拆分后的结果保留起来 // 1、 合并报文
ByteBuf message = null;
int tmpMsgSize = tempMsg.readableBytes();
// 如果暂存有上一次余下的请求报文,则合并
if (tmpMsgSize > 0) {
message = Unpooled.buffer();
message.writeBytes(tempMsg);
message.writeBytes(in);
System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
} else {
message = in;
} // 2、 拆分报文
// 这个场景下,一个请求固定长度为3,可以根据长度来拆分
// i+1 i+1 i+1 i+1 i+1
// 不固定长度,需要应用层协议来约定 如何计算长度
// 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
// dubbo rpc协议 = header(16) + body(不固定)
// header最后四个字节来标识body
// 长度 = 16 + body长度
// 0xda, 0xbb 魔数 int size = message.readableBytes();
int counter = size / PACKET_SIZE;
for (int i = 0; i < counter; i++) {
byte[] request = new byte[PACKET_SIZE];
// 每次从总的消息中读取220个字节的数据
message.readBytes(request); // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
out.add(Unpooled.copiedBuffer(request));
} // 3、多余的报文存起来
// 第一个报文: i+ 暂存
// 第二个报文: 1 与第一次
size = message.readableBytes();
if (size != 0) {
System.out.println("多余的数据长度:" + size);
// 剩下来的数据放到tempMsg暂存 留到下次再进行合并
tempMsg.clear();
tempMsg.writeBytes(message.readBytes(size));
}
}
}

上面的处理不适用复杂的现实场景,Netty提供了大量的现成的编解码工具,我们一般使用这些工具

三、使用websocket

websocket协议是基于TCP的一种新的网络协议。

它的出现实现了浏览器与服务器双全工(full-duplex)通信:允许服务器主动发送信息给客户端。

半双工:服务器不能主动响应浏览器,只能等待请求后再响应。

多客户端多语言多服务器支持:浏览器、php、Java、ruby、nginx、python、Tomcat、erlang、.net等等

Java高并发网络编程(五)Netty应用-LMLPHP

代码示例

public final class WebSocketServer {

    static int PORT = 9000;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.childHandler(new WebSocketServerInitializer())
.childOption(ChannelOption.SO_REUSEADDR, true);
for (int i = 0; i < 100; i++) { // 绑定100个端口
b.bind(++PORT).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("端口绑定完成:" + future.channel().localAddress());
}
});
} // 端口绑定完成,启动消息随机推送(测试)
TestCenter.startTest(); System.in.read();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

开启端口的复用ChannelOption.SO_REUSEADDR,这是底层的TCP的参数,和我们代码无关

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 职责链, 数据处理流程
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec()); // 转为http请求
pipeline.addLast(new HttpObjectAggregator(65536)); // 最大数据量
pipeline.addLast(new WebSocketServerHandler()); // websocket握手,处理后续消息
pipeline.addLast(new NewConnectHandler());
}
}
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private static final String WEBSOCKET_PATH = "/websocket";

    private WebSocketServerHandshaker handshaker;

    public static final LongAdder counter = new LongAdder();

    @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
counter.add(1);
if (msg instanceof FullHttpRequest) {
// 处理websocket握手
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
// 处理websocket后续的消息
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
} private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Handle a bad request. //如果http解码失败 则返回http异常 并且判断消息头有没有包含Upgrade字段(协议升级)
if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
} // 构造握手响应返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
// 版本不支持
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
ctx.fireChannelRead(req.retain()); // 继续传播
}
} private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// Check for closing frame 关闭
if (frame instanceof CloseWebSocketFrame) {
Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get();
TestCenter.removeConnection(userId);
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
if (frame instanceof PingWebSocketFrame) { // ping/pong作为心跳
System.out.println("ping: " + frame);
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) {
// Echo the frame
// TODO 处理具体的数据请求(... 云课堂聊天室,推送给其他的用户)
//发送到客户端websocket
ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()
+ ", 欢迎使用Netty WebSocket服务, 现在时刻:"
+ new java.util.Date().toString())); return;
}
// 不处理二进制消息
if (frame instanceof BinaryWebSocketFrame) {
// Echo the frame
ctx.write(frame.retain());
}
} private static void sendHttpResponse(
ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
} // Send the response and close the connection if necessary.
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
} private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
return "ws://" + location;
}
}
// 新连接建立了
public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// 解析请求,判断token,拿到用户ID。
Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();
// String token = parameters.get("token").get(0); 不是所有人都能连接,比如需要登录之后,发放一个推送的token
String userId = parameters.get("userId").get(0);
ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId
TestCenter.saveConnection(userId, ctx.channel()); // 保存连接 // 结束
}
}

保存到TestCenter

// 正常情况是,后台系统通过接口请求,把数据丢到对应的MQ队列,再由推送服务器读取
public class TestCenter {
// 此处假设一个用户一台设备,否则用户的通道应该是多个。
// TODO 还应该有一个定时任务,用于检测失效的连接(类似缓存中的LRU算法,长时间不使用,就拿出来检测一下是否断开了);
static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>(); // 保存信息
public static void saveConnection(String userId, Channel channel) {
userInfos.put(userId, channel);
} // 退出的时候移除掉
public static void removeConnection(Object userId) {
if (userId != null) {
userInfos.remove(userId.toString());
}
} final static byte[] JUST_TEST = new byte[1024]; public static void startTest() {
// 发一个tony吧
System.arraycopy("tony".getBytes(), 0, JUST_TEST, 0, 4);
final String sendmsg = System.getProperty("netease.server.test.sendmsg", "false");
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
try {
// 压力测试,在用户中随机抽取1/10进行发送
if (userInfos.isEmpty()) {
return;
}
int size = userInfos.size();
ConcurrentHashMap.KeySetView<String, Channel> keySetView = userInfos.keySet();
String[] keys = keySetView.toArray(new String[]{});
System.out.println(WebSocketServerHandler.counter.sum() + " : 当前用户数量" + keys.length);
if (Boolean.valueOf(sendmsg)) { // 是否开启发送
for (int i = 0; i < (size > 10 ? size / 10 : size); i++) {
// 提交任务给它执行
String key = keys[new Random().nextInt(size)];
Channel channel = userInfos.get(key);
if (channel == null) {
continue;
}
if (!channel.isActive()) {
userInfos.remove(key);
continue;
}
channel.eventLoop().execute(() -> {
channel.writeAndFlush(new TextWebSocketFrame(new String(JUST_TEST))); // 推送1024字节
}); }
}
} catch (Exception ex) {
ex.printStackTrace();
}
}, 1000L, 2000L, TimeUnit.MILLISECONDS);
}
}

浏览器测试

<!-- saved from url=(0022)http://127.0.0.1:8080/ -->
<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
// 随机数
var random = Math.floor(Math.random()*(10000 - 10 +1) + 10)
socket = new WebSocket("ws://127.0.0.1:9001/websocket?userId=" + random);
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data
};
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "Web Socket opened!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + "Web Socket closed";
};
} else {
alert("Your browser does not support Web Socket.");
} function send(message) {
if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("The socket is not open.");
}
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"><input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)">
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form> </body></html>

浏览器扛不住巨量的请求,使用Java客户端进行测试

public final class WebSocketClient {

    public static void main(String[] args) throws Exception {
final String host = System.getProperty("netease.pushserver.host", "127.0.0.1");
final String maxSize = System.getProperty("netease.client.port.maxSize", "100");
final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000");
int port = 9001; EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.option(ChannelOption.SO_REUSEADDR, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
p.addLast(WebSocketClientCompressionHandler.INSTANCE);
p.addLast("webSocketClientHandler", new WebSocketClientHandler());
}
});
// tcp 建立连接
for (int i = 0; i < 100; i++) { // 服务端有100个端口,发起对100个端口反复的连接
for (int j = 0; j < 60000; j++) { // 每个端口6万次连接
b.connect(host, port).sync().get();
}
port++;
}
System.in.read();
} finally {
group.shutdownGracefully();
}
}
}
// handler 处理多个事件~ 包括tcp连接建立之后的事件
// open websocket
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture; public ChannelFuture handshakeFuture() {
return handshakeFuture;
} @Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
} static AtomicInteger counter = new AtomicInteger(0); @Override
public void channelActive(ChannelHandlerContext ctx) {
if (handshaker == null) {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
URI uri = null;
try {
uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet());
} catch (Exception e) {
e.printStackTrace();
}
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
}
handshaker.handshake(ctx.channel());
} @Override
public void channelInactive(ChannelHandlerContext ctx) {
if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!");
} @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("WebSocket Client connected!");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
} if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
if ("true".equals(System.getProperty("netease.debug")))
System.out.println("WebSocket Client received closing");
ch.close();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}

网络四元组:客户端IP,服务端IP,客户端端口,服务端端口,这四元组规定了一个连接

客户端端口数量有限,服务器只有一个端口的情况下,同一个客户端只能对它发送6万多个连接,

服务器开启多个接口,服务器每开启一个端口,客户端可增加6万多连接

上面的测试环境,为了增加连接容纳量,服务端和客户端都开启了端口复用

Java高并发网络编程(五)Netty应用-LMLPHP

打包上传服务器 服务端6G4核,客户端6G2核

Java高并发网络编程(五)Netty应用-LMLPHP

运行服务端程序

Java高并发网络编程(五)Netty应用-LMLPHP

运行客户端程序

Java高并发网络编程(五)Netty应用-LMLPHP

客户端的端口是操作系统分配好的,也可以自己指定分配区间

报错

Java高并发网络编程(五)Netty应用-LMLPHP

文件描述符

Java高并发网络编程(五)Netty应用-LMLPHP

open files太小了,调参数

Java高并发网络编程(五)Netty应用-LMLPHP

Java高并发网络编程(五)Netty应用-LMLPHP

允许100万个文件描述符

重新登陆,生效

04-15 01:24