一、netty简介

netty是什么,netty干什么的?

推荐阅读:https://blog.csdn.net/bjweimengshu/article/details/78786315

 

二、一个简单的netty应用

1.一个最简单的netty应用,至少应该有两个类,启动类(sever/client)和处理类(handler)

2.启动类:用于配置应用的各种参数(如group,chanel,chilhandler,option,childOption,以及端口),最主要的则是添加handler,也就是处理的业务逻辑。

3.处理类:处理的具体逻辑。一般通过继承ChannelHandlerAdapter或者其子类(当然也可以通过实现ChannelHandler)

上代码,一个较为简单的应用

EchoServer.java


public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    //指定NioServerSocketChannel用于实例化一个Channel,该channel是来接收 新进来的连接的
                    .channel(NioServerSocketChannel.class)

                    //指定handler。ChannelInitializer是一个特别的handler,用于帮助用户配置一个新的Channel
                    //通过ch.pipeline().addLast(new DiscardServerHandler());添加我们定义handler,
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //EchoServerHandler在这里添加
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })

                    //对Channel进行一些配置
                    // 注意以下是socket的标准参数
                    // BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                    // 用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java使用默认值50
                    // Option是为了NioServerSocketChannel设置的,用来接收传入连接的
                    .option(ChannelOption.SO_BACKLOG, 128)

                    //是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)
                    //并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活
                    //childOption是用来给父级ServerChannel之下的Channels设置参数的
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            //绑定端口并且启动应用来,等待连接
            ChannelFuture f = b.bind(port).sync();

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port= 8080;
        new EchoServer(port).run();
    }

EchoServerHandler.java

//继承ChannelInboundHandlerAdapter,重写其中的方法
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    //数据传入时的回调
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    //出现异常时的回调
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

说明:启动EchoServer之后,打开windows的cmd命令窗口,键入telnet localhost 8080 会连接到应用。在这输入任意字符,都会输出到ide的控制台。(windows默认是没有开启telnet的,自行百度)

 

三、数据的入站和出站

即信息的输入输出。入站:数据从远程机器发送到本机。出站:数据从本机发送到远程机器。

netty是以流进行传输数据,所以‘进站‘和‘入站’,都需要通过ByteBuf来进行数据的传输。

 

发送数据的步骤(对比看代码TimeServerHandler的channelActive方法)

1.通过ChannelHandlerContext上下文获取一个ByteBuf,

(代码:final ByteBuf time = ctx.alloc().buffer(4);)

2.将数据写入ByteBuf

(代码:time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L));)

3.将ByteBuf写入ChannelHandlerContext中,并且强制刷入数据

(代码:final ChannelFuture f = ctx.writeAndFlush(time);)

4.监听ChannelFuture,待数据传输完成,关闭连接

 

接收数据 (对比看代码TimeClientHandler的ChannelRead方法)

channelRead方法直接把接收的数据当作参数传入方法里了,但是,传入的时Object对象,需要强制转换成ByteBuf对象,然后读取其中的字节。

来看一个例子TimeServer和TimeClient

TimeServer.java,启动器类,与之前的EchoServer差不多,只是在添加处理器的时候不一样。

public class TimeServer {

    private int port;

    public TimeServer(int port) { this.port = port; }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port=8080;
        new TimeServer(port).run();
    }
}

再看TimeServerHandler.java

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override

    //这里只 覆盖了channelActive()方法,该方法成功建立连接时的回调。
    //因为,客户端一连接上服务端,服务端就给客户端发送一个时间过去,然后断开连接。
    public void channelActive(final ChannelHandlerContext ctx) {
        //一个整数是4个字节
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L));
        //netty是非阻塞是的io,所以它会立刻返回一个ChannelFuture对象(无论数据是否传输完成)
        //该对象可用于监听写操作是否完成
        final ChannelFuture f = ctx.writeAndFlush(time);
        //监听这个操作,等这个操作(ctx.writeAndFlush(time)将数据输出)完成之后,再调用close()
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

还有TimeClient.java

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host="localhost";
        int port=8080;
        //服务端有两个NioEventLoopGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //服务端是ServerBootstarp,客户端则是Bootstarp
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    //TimeClientHandler在这添加
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            // 连接
            ChannelFuture f = b.connect(host, port).sync();
            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

最后TimeClientHandler.java

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    //这里重写了channelRead(),该方法是在接收到远程传输的数据时的回调
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf buf = (ByteBuf) msg;
        try {
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            buf.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

说明:先启动TimeServer,再启动TimeClient。成功连接后,服务端会传回一个时间给客户端

 

四、传输对象

如何传输对象?将对象序列化自然就能够将在网络中传输了。

 

首先 序列化和反序列化的工具类ByteObjConverter.java


import java.io.*;

public class ByteObjConverter {

    public static Object byteToObject(byte[] bytes) {
        Object obj = null;
        ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
        ObjectInputStream oi = null;
        try {
            oi = new ObjectInputStream(bi);
            obj = oi.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                bi.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                oi.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return obj;
    }

    public static byte[] objectToByte(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = null;
        try {
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                bo.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                oo.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bytes;
    }
}

然后是服务端ServerHandler中的channelActive()方法

   public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        //这里可以是任意自定义对象
        User user = new User(1,"jing",123,"xxxxxx");
        //序列化位byte数组
        byte[] datas = ByteObjConverter.objectToByte(user);
        //写入channel
        ChannelFuture future = ctx.writeAndFlush(datas);
        //监听
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                ctx.close();
            }
        });
    }

再是客户端ClientHandler的ChannelRead()方法

 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf in = (ByteBuf) msg;

        byte[] bytes = new byte[in.readableBytes()];

        in.readBytes(bytes);
        //反序列化
        Object obj = ByteObjConverter.byteToObject(bytes);

        System.out.printf(obj.toString());

    }

说明:这里只给了部分代码,其它的代码与TimeServer差不多,读者可参考修改。然后运行。

 

五、处理链

通过之前的一个例子可以传递对象了,然而序列化和反序列都包含在了handler处理逻辑中了,这样看上去十分不优雅,也不利于维护。因此我们将序列化与反序列化提练出来。

首先需要一个UserEncoder.java,用于服务端发送数据时将User对象序列化,代码:

public class UserEncoder extends MessageToByteEncoder<User> {
    @Override
    //这里直接将我们要传的User对象当作参数传了进来
    protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
        //序列化该User
        byte[] datas = ByteObjConverter.objectToByte(user);
        //传递到下一个Handler
        out.writeBytes(datas);
        //强制刷入
        ctx.flush();
    }
}

然后还需要一个UserDEcoder.java,用于客户端接收数据是,将User反序列化,代码:

//继承ByteToMessageDecoder,
//ByteToMessageDecoder也是一个ChannelInboundHandler
public class UserDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        byte[] bytes = new byte[in.readableBytes()];
        //读取字节
        in.readBytes(bytes);
        //反序列化
        Object obj = ByteObjConverter.byteToObject(bytes);
        //交给下一个handler
        out.add(obj);
    }
}

再是ClientServer.java 和ServerHandler.java

//UserServerHandler.java
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        System.out.println("已经成功过连接上 ");
        User user = new User(1,"jing",123,"xxxxxx");
        //这里直接写入对象
        ChannelFuture future = ctx.writeAndFlush(user);
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                ctx.close();
            }
        });
    }

//UserClientHandler.java
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //直接强转类型即可
        User msg1 = (User) msg;
        System.out.printf(msg1.toString());
    }

 

最后Client/Server启动器类,相应添加UserEncoder和UserDecoder

//UserClient.java
 bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new UserDecoder(),//这里添加
                            new UserClientHandler()
                    );
                }
            });



//UserServer.java
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                            new UserEncoder(),//这里添加
                            new UserServerHandler()
                    );
                }
            });

说明:先启动服务端,再启动客户端。连接成功后,客户端后台会输出接收到的User.

10-07 15:57