一、背景描述

使用Netty进行两台或者多台服务器之间的数据通信,大体有以下三种情况:

  1. 使用长连接通道不断开的方式进行通信。也就是服务器和客户端的通道不断开,一直处于开启状态,如果服务器的性能足够好,并且我们的客户端数量足够少的情况下,推荐这种方式。
  2. 一次性批量提交数据,推荐采用短连接方式。即我们可以把数据保存在本地临时缓冲区或者临时表中,当到达一定临界值的时候一次性批量提交,或者是根据定时任务轮询提交,这种方式的弊端是做不到实时性,在对实时性要求不高的应用程序中推荐使用。
  3. 我们可以使用一种特殊的长连接,在指定的一段时间内,服务器与某台客户端么有进行任何通信,则断开连接。下次客户端向服务器端发送请求时再次建立连接,

3这种模式需要考虑两种因素:

  • 3.1 如何在服务器端和客户端在一定时间超时后关闭通道?关闭通道后如何再建立连接?
    答案:可以使用nettyReadTimeoutHandler,在一定时间内没读取到数据则断开连接;再次建立连接直接发起请求即可。

  • 3.2 客户端宕机时我们无需考虑,下次客户端重启后就可以与服务器建立连接;但是服务器宕机后,我们客户端如何与服务端建立连接?
    答案无非是隔一段事件轮询建立连接。


二、代码示例

nettyReadTimeOut实现方案3

  • 服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

    public static void main(String[] args) throws Exception{
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         //设置日志
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                //设置服务端的超时时间
                sc.pipeline().addLast(new ReadTimeoutHandler(5));
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        ChannelFuture cf = b.bind(8765).sync();
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
    }
}

  • ServerHandler
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request)msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
        Response response = new Response();
        response.setId(request.getId());
        response.setName("response" + request.getId());
        response.setResponseMessage("响应内容" + request.getId());
        ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }
   @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  • client
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;
public class Client {
    private static class SingletonHolder {
        static final Client instance = new Client();
    }

    public static Client getInstance(){
        return SingletonHolder.instance;
    }

    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf ;

    private Client(){
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5));
                        sc.pipeline().addLast(new ClientHandler());
                    }
            });
    }

    public void connect(){
        try {
            this.cf = b.connect("127.0.0.1", 8765).sync();
            System.out.println("远程服务器已经连接, 可以进行数据交换..");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public ChannelFuture getChannelFuture(){

        if(this.cf == null){
            this.connect();
        }
        if(!this.cf.channel().isActive()){
            this.connect();
        }
        return this.cf;
    }
    public static void main(String[] args) throws Exception{
        final Client c = Client.getInstance();
        //c.connect();

        ChannelFuture cf = c.getChannelFuture();
        //客户端每隔4s钟向服务器端发数据:
        for(int i = 1; i <= 3; i++ ){
            Request request = new Request();
            request.setId("" + i);
            request.setName("pro" + i);
            request.setRequestMessage("数据信息" + i);
            cf.channel().writeAndFlush(request);
            TimeUnit.SECONDS.sleep(4);
        }
        cf.channel().closeFuture().sync();
        //让客户端断开后可以重新连接上
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("进入子线程...");
                    ChannelFuture cf = c.getChannelFuture();
                    System.out.println(cf.channel().isActive());
                    System.out.println(cf.channel().isOpen());
                    //再次发送数据
                    Request request = new Request();
                    request.setId("" + 4);
                    request.setName("pro" + 4);
                    request.setRequestMessage("数据信息" + 4);
                    cf.channel().writeAndFlush(request);
                    cf.channel().closeFuture().sync();
                    System.out.println("子线程结束.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("断开连接,主线程结束..");
    }
}

main方法,可以发现for(int i = 1; i <= 3; i++ ) 这个循环中,每个循环停顿4秒,也就是每隔4秒发送一次请求,而服务器端的超时时间设置为5秒,那么在这个for循环期间连接是不会断开的,等for循环结束cf.channel().closeFuture().sync(); 断开连接this.cf.channel().isActive() 变为否,在new Thread()中再次发送请求,getChannelFuture会重新建立连接


  • clientHandler
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelHandlerAdapter{
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   }
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       try {
           Response resp = (Response)msg;
           System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
       } finally {
           ReferenceCountUtil.release(msg);
       }
   }
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

   }
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
   }
}

  • Marshalling工厂
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public final class MarshallingCodeCFactory {
    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象,配置了版本号为5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
  • 其余的两个传输对象
public class Request implements Serializable{

	private static final long  SerialVersionUID = 1L;

	private String id ;
	private String name ;
	private String requestMessage ;
	……
}
public class Response implements Serializable{

	private static final long serialVersionUID = 1L;

	private String id;
	private String name;
	private String responseMessage;
	……
}

参考

https://blog.csdn.net/shengqianfeng/article/details/80809413
https://www.cnblogs.com/sigm/p/6372520.html

10-05 16:47