前言

之前在《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,站在 server 端的角度简单分析过 seata 的网络通信模块。
上篇文章中提到了一些概念,例如说:异步转同步的 Future 机制,防洪机制,以及消息队列机制等,并非只是服务端才具有的功能特性,实际上对于客户端来说也同样适用,它们是整个通用的通信机制。
这篇文章讲讲 client 角度的通信细节,以及在上篇文章中未提及的一些关键点,算是对 seata 整个 client-server 通信模块的一个总结。
已经在上篇文章提及的概念在这篇文章中就不再赘述。

客户端角色

在 seata 中,它对客户端是有角色区分的,它把客户端分为两种角色,分别是 TM 和 RM。
在 seata 的设计中, TM 是全局事务的发起者, RM 则是全局事务的参与者。
这两种角色在进行启动的时候,都要向 server 进行注册,这个注册也可以等同于一个认证的作用,只有已经注册的客户端才可以进行其它操作。
不过注册的意义不仅仅在于认证,特别是对于 RM 来说,很关键的一点是要向 server 注册它的 resourceId,这一点十分重要,它关系到 RM 的高可用问题。
如果某个 RM 停机了,而全局事务还未进行完,这个时候就可以采用委托给其它 RM 的方式,保证全局事务正常结束,这里一个关键的地方,就是 resourceId。
这篇文章暂时不在这里展开,后面再单独写一写这里的设计。

《阿里开源分布式事务组件 seata :seata server 通信层解析》这篇文章中,我们已经看到,无论是客户端还是服务端的实现,它们都以 AbstractRpcRemoting 作为抽象骨架。
AbstractRpcRemoting 这个类包含了客户端和服务端的公共逻辑,包括

  1. 消息的发送和接收逻辑
  2. future 机制
  3. 防洪机制

连接池

这些在前面的文章里已经有提到过了。
客户端的实现细节主要是在 AbstractRpcRemotingClient 类里,它继承自 AbstractRpcRemoting 类。
因为使用的是 netty,所以必然会有 Bootstrap 逻辑,seata 把这一块的逻辑单独放到 RpcClientBootstrap 这个类里去,然后只对外提供一个 getNewChannel 方法。
这样来看,创建连接的细节已经被良好的封装,并且 RpcClientBootstrap 的职责也非常明确,那就是建立新连接。
不过大多数客户端都会引入连接池的设计,以达到对连接的重用,避免频繁创建和销毁连接带来的额外开销。seata 也用了连接池管理机制。
不过 seata 的连接池管理是借助 apache 的 common pools 来实现的,common pools 是一个对象池,这里把一条连接 channel 当作一个对象,就变成了连接池了。
借助 common pools 实现对象池,需要实现 KeyedPoolableObjectFactory 接口对应的相关方法,关键是以下三个方法:

V makeObject(K key) throws Exception; //方法定义了新对象是如何创建的。
void destroyObject(K key, V obj) throws Exception; //方法定义了对象应该怎么样去销毁。
boolean validateObject(K key, V obj);  //方法定义了如何检测对象的有效性。

另外还要指定对象的类型,以及用于存取对象的 key 的类型。

NettyPoolableFactory 这个类便实现了 KeyedPoolableObjectFactory 接口,并且定义了对象的类型是 Channel,而 key 是 NettyPoolKey。
makeObject 的实现主要分解为几个步骤,第一个是建立新连接:

  InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("NettyPool create channel to " + key);
  }
  Channel tmpChannel = clientBootstrap.getNewChannel(address);
  long start = System.currentTimeMillis();
  Object response;
  Channel channelToServer = null;
  if (null == key.getMessage()) {
   throw new FrameworkException(
     "register msg is null, role:" + key.getTransactionRole().name());
  }

接下来,对连接进行注册,如果注册失败,则会抛出异常,并关闭刚刚建立的连接,这意味着一个连接建立以后,必须注册成功才能使用。如下代码所示:

  try {
   response = rpcRemotingClient.sendAsyncRequestWithResponse(tmpChannel, key.getMessage());
   if (!isResponseSuccess(response, key.getTransactionRole())) {
    rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
   } else {
    channelToServer = tmpChannel;
    rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
      key.getMessage());
   }
  } catch (Exception exx) {
   if (tmpChannel != null) {
    tmpChannel.close();
   }
   throw new FrameworkException(
     "register error,role:" + key.getTransactionRole().name() + ",err:" + exx.getMessage());
  }

在上面我们也可以看到,注册消息是通过 NettyPoolKey 携带进来的。

销毁对象池的对象,在这里,其实就是关闭连接,而验证对象的有效性,其实就是验证连接的有效性。这两个方法的实现比较简单:

 @Override
 public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {

  if (null != channel) {
   if (LOGGER.isInfoEnabled()) {
    LOGGER.info("will destroy channel:" + channel);
   }
   channel.disconnect();
   channel.close();
  }
 }


 @Override
 public boolean validateObject(NettyPoolKey key, Channel obj) {
  if (null != obj && obj.isActive()) {
   return true;
  }
  if (LOGGER.isInfoEnabled()) {
   LOGGER.info("channel valid false,channel:" + obj);
  }
  return false;
 }

连接管理

NettyClientChannelManager 这个类集成连接池机制和建立新连接的实现以后,成为了一个连接管理器。
当根据内部逻辑,需要向服务端发送信息时,都会通过 NettyClientChannelManager 获取连接。
虽然 NettyClientChannelManager 已经集成了一个连接池,不过它还是内部维护了 channel 的缓存

private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();

当外部调用它来获取连接时,它先从自己的缓存里找,如果找到了,先验证channel 是否有效性,如果有效,直接返回这个 channel。
如果 channel 已经无效,或者没有缓存,那么就会委托连接池去建立新连接。这些细节在它的 acquireChannel 方法里:

    /**
     * Acquire netty client channel connected to remote server.
     *
     * @param serverAddress server address
     * @return netty channel
     */
    Channel acquireChannel(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null) {
            channelToServer = getExistAliveChannel(channelToServer, serverAddress);
            if (null != channelToServer) {
                return channelToServer;
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will connect to " + serverAddress);
        }
        channelLocks.putIfAbsent(serverAddress, new Object());
        synchronized (channelLocks.get(serverAddress)) {
            return doConnect(serverAddress);
        }
    }

这里也可以看到,如果需要建立新连接,是需要加锁做同步的,NettyClientChannelManager 内部也维护了一串锁。
下面看看建立连接的细节,前面我们说到 注册消息是通过 NettyPoolKey 携带进来的。
在建立连接时,需要生成 NettyPoolKey, seata 在这里应用了 java8 的一个新的特性 Function,即函数式接口。
也就是说,一个 NettyPoolKey 如何生成,由外部传入一个 函数 来决定的。并不需要 NettyClientChannelManager 本身去关心,这个生成方式相关的变量就是

    private Function<String, NettyPoolKey> poolKeyFunction;

它作为 NettyClientChannelManager 的构造函数的一个参数被传进来。
对于 RM Client 来说,它的 NettyPoolKey 的生成方式如下所示:

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return (serverAddress) -> {
            String resourceIds = customerKeys == null ? getMergedResourceKeys() : customerKeys;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("RM will register :" + resourceIds);
            }
            RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
            message.setResourceIds(resourceIds);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.RMROLE, serverAddress, message);
        };
    }

它同时也决定了 NettyPoolKey 里面携带的 message 到底是 RegisterRMRequest 还是 RegisterTMRequest。

同理,对于 TM Client 来说,它也有它自己的 NettyPoolKey 生成规则:

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return (severAddress) -> {
            RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
        };
    }

函数式编程的特点突然给人一种 IoC 的感觉。

那么回到 NettyClientChannelManager 建立连接的逻辑,上面我们已经解释过一些函数式和 NettyPoolKey 的背景了,直接看它的 doConnect 方法:

    private Channel doConnect(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return channelToServer;
        }
        Channel channelFromPool;
        try {
            NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
            NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
            if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
                RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
                ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
            }
            channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
            channels.put(serverAddress, channelFromPool);
        } catch (Exception exx) {
            LOGGER.error(FrameworkErrorCode.RegisterRM.getErrCode(), "register RM failed.", exx);
            throw new FrameworkException("can not register RM,err:" + exx.getMessage());
        }
        return channelFromPool;
    }

可以看到,其实逻辑没什么特别的,就是去 对象池里新生成个对象,创建一个新的连接而已。不过这里也有可以注意的地方:
NettyClientChannelManager 不仅缓存了连接,维护了锁,它也缓存 NettyPoolKey,如果它发现要新建连接,并且 NettyPoolKey 在之前就缓存过的话,它会看看是否是 RM Client 建立的连接,如果是,那么需要更新一下 resource ids。前面我们有说到,RM 注册时,注册 resource ids 是非常重要的。

另外,NettyClientChannelManager 里还有一个 reconnect 方法,它是在 AbstractRpcRemotingClient 初始化的时候,作为一个定时任务运行的

    @Override
    public void init() {
        clientBootstrap.start();
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
        mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
            MAX_MERGE_SEND_THREAD,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
        mergeSendExecutorService.submit(new MergedSendRunnable());
        super.init();

它的作用,实际上是去注册中心查找当前服务组下有哪些可用的 server,定时检查与这些 server 之间的连接是否可用,如果不可用,则立马新建连接并缓存。

消息打包

在这个里我们也可以看到 mergeSendExecutorService 这个线程池,之前的文章《阿里开源分布式事务组件 seata :seata server 通信层解析》讲过这里,实际上它是消息队列机制,用来合并消息,再进行发送。我们在之前的文章里也提示过,seata server端的实现里,并没有定时去清除 basketMap,实际上是因为 server 端在通信时,根本没有用到这个 basketMap。
而客户端则有可能会用到,所以这里专门有一个线程池,去清空 basketMap 里的打包消息。

对于 seata 的通信模式和细节大体上就这些,本文到次结束。

03-05 23:19