基于netty 和 websocket 做一个即时通信 聊天的小应用练习

先来了解即时通信 , 一般会使用三种实现方式:

  • Ajax 轮训
  • Long pull
  • websocket

有很多的例子,比如一些电脑上群组聊天室,手游中的聊天平台等等,都需要一个实时通信,如何实现双向通信

Ajax轮训,是制定每过几秒钟,去ajax异步请求同步服务器新的数据

Long pull也是采用循环的方式,是一种阻塞的模式,当发出请求,如果服务器不响应,他就会一直卡住不动,早期的通信方式

websocket最初由H5提起,是一种协议,http1.1是支持长链接,http1.0是不支持长链接的,websocket基于TCP协议之上,提供具有持久性的协议

对比http每发起一个,必然存在request和response,且是1:1对应的

websocket的优点使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据, 只需要一次链接即可保持长久链接传输数据,除非自己退出游戏了,重新上线

Web即时通信

基于之前的经验,先写一个server服务端

package com.yus.netty.server;

import io.netty.bootstrap.BootstrapConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * websocket 服务端
 */
public class WebSocketServer {

    public static void main(String[] args) throws InterruptedException {
        //采用主从线程组模型
        //主线程组
        EventLoopGroup primaryGroup = new NioEventLoopGroup();

        //从线程组
        EventLoopGroup subGroup = new NioEventLoopGroup();

        try {
            //服务启动器
            ServerBootstrap bootstrap = new ServerBootstrap();

            //建立通道,管道以及助手处理类 入口
            bootstrap.group(primaryGroup, subGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WebSocketChannelInit());

            //绑定端口
            ChannelFuture future = bootstrap.bind(8081).sync();
            future.channel().closeFuture().sync();
        } finally {
            //关闭
            primaryGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }

    }
}

初始化器

package com.yus.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 通道初始化器
 */
public class WebSocketChannelInit extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //获取管道
        ChannelPipeline pipeline = ch.pipeline();

        //=====开始============用于支持Http协议的处理类=================
        //通信 netty提供的编解码处理类
        pipeline.addLast(new HttpServerCodec());

        //对处理数量大的数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());

        //聚合器,方便处理http消息 1024*64 为消息最大长度(byte)支持http协议的必要处理类
        pipeline.addLast(new HttpObjectAggregator(1024*64));
        //=====结束============用于支持Http协议的处理类=================

        // 支持websocket协议的处理类,建立链接时使用
        // /ws指定客户端访问服务端的路由,可随便自定义,这边写ws是websocket简写
        // 该处理类帮我们处理繁重的事情并run websocket服务端,
        // 并管理通信握手动作(包括close关闭,Ping请求,Pong响应)Ping+Pong=心跳,关于心跳后续再做说明
        // 并以frame进行数据传输,不同的数据类型,frame也不同
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //自定义 处理类,主要用于读取客户端消息,然后对消息进行处理,最后可以返回给客户端
        pipeline.addLast("myHandle", new MyHandle());
    }
}

自定义处理类

package com.yus.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 自定义处理类
 * 在写初识化器时有说明,关于websocket传输时,主要以frames方式传输
 * 在Netty中frame会专门为websocket处理 文本 的对象 - TextWebSocketFrame
 * frame是消息的载体
 */
public class MyHandle extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 第四步
    // ChannelGroup:记录和管理Channel,使用DefaultChannelGroup默认实现,GlobalEventExecutor全局初始化
    private static ChannelGroup channelClient = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //第一步:从消息载体中获取客户端的消息
        String content = msg.text();
        System.out.println("消息:" + content);

        //第二步:
        //拿到消息文本,然后将消息发给所有客户端,这时不管有多少个客户端
        //都可以将此客户端的消息给所有的客户端,每一个客户端会注册一个channel进来
        //通过channel通道进行消息推送出去,这时候就用到了上次学习的Channel的方法周期,
        //生命周期 重写handlerAdded 和 handlerRemoved
        Channel channel = ctx.channel();

        //第七步
        //将数据 刷到所有的客户端 第一种方式
        for (Channel channels : channelClient){
            //注意 这边的载体是泛型TextWebSocketFrame ,不能直接String扔出去
            //要将消息放入载体,再送出去
            channels.writeAndFlush(new TextWebSocketFrame("我在哪里,我被送出去了吗?"));

        }

        //将数据 刷到所有的客户端 第二种 方式直接用ChannelGroup的writeAndFlush
        //channelClient.writeAndFlush(new TextWebSocketFrame("我又在哪里,我被送出去了吗?"));
    }

    /**
     * 第三步
     * 客户端创建了,触发
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //获取客户端的channel双向通道
        Channel channel = ctx.channel();

        //第五步
        //添加到ChannelGroup,方便全局管理
        channelClient.add(channel);
    }

    /**
     * 第六步
     * 客户端离开了,触发
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //当触发这个handlerRemoved时,其实ChannelGroup会自动移除对应客户端的通道channel

        //所以不需要我们去调remove的方法,测试发现是多余的
        //channelClient.remove(ctx.channel());

        //ctx.channel().id()中存在两个ID,一个长ID,一个短ID,如果学习过zookeeper的同学会熟悉一些
        //服务少的时候,短ID冲突的可能性小,会用短ID进行选择,反之就是长ID
        System.out.println("Channel 长ID为 " + ctx.channel().id().asLongText() + "客户端离开了");
        System.out.println("Channel 短ID为 " + ctx.channel().id().asShortText() + "客户端离开了");
    }
}

编写完后端,已经快1点钟了,睡觉了,明天继续写个测试前端页面

--------------------------------------------------

12-20 06:15