前言:Vert.x 实现了2种完成不同的eventBus:

EventBusImpl(A local event bus implementation)和 它的子类 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。这里介绍下EventBusImpl

 

EventBusImpl 原理:调用consumer方法时,以address-handler作为k-v存在一个map的容器中。接着调用send方法时,把message,DeploymentOptions等内容封装成对象(MessageIml,命令模式),从以address为k从map里取出handler.把MessageIml作为参数传递给handler运行。

 

一.初始化: 

初始化过程就是new  EventBusImpl,并修改状态变量started。

首先,在VertxImpl的构造方法

VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)

中进行初始化。以  options.isClustered()为判断条件,调用createAndStartEventBus(options, resultHandler);

其次createAndStartEventBus中做了2件事

1.以options.isClustered()判断条件,new出了ClusteredEventBus/ EventBusImpl. new时并没有业务逻辑。(额外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互拥有对方的引用,是很常见的写法。)

2.调用EventBusImpl的初始化方法start(),并返回结果给最外层resultHandler的。start()更没做什么事,只是EventBusImpl里面有个状态变量started。把它置为true.

 

二. consumer订阅

EventBusImpl维护了

protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>()

成员变量。

Handlers 是一个handler的List的封装类,上面可以理解为 ConcurrentMap<String, List<Handler>>这种数据结构。consumer方法以address为k,以handler作v的list的一员,存放在handlerMap中。

所以重点关注对handlerMap的操作。

 

调用vertx.eventBus().consumer("Address1", ar -> {});发生了什么?

查看代码发现,先new HandlerRegistration这里也有相互引用。再调用HandlerRegistration .handler,那里面又会调用eventBusImpl.addRegistration()。在HandlerRegistration这个类兜了一圈,又回到eventBusImpl里。

(相关代码截断如下:  EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);

核心逻辑在addRegistration() 和 addLocalRegistration()中。我的理解是,前个方法明显有问题。最后一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的参数都没有使用,应该可以省略,修改为addRegistration(registration::setResult);就可以。很少在Vert.x框架中看到这样不合规范的代码。如果读者有好的见解,欢迎留言。

 

// 调用 addLocalRegistration

// 注册完成

protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
                                   boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(registration.getHandler(), "handler");
  boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
  addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

 

/** *

* 初始化 或 获取原 Contex

初始化 或 获取原 Handlers

* 新建  HandlerHolder

* Handlers 里添加  HandlerHolder

**/

protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
                                           boolean replyHandler, boolean localOnly) {
  Objects.requireNonNull(address, "address");

  Context context = Vertx.currentContext();
  boolean hasContext = context != null;
  if (!hasContext) {
    // Embedded
    context = vertx.getOrCreateContext();
  }
  registration.setHandlerContext(context);

  boolean newAddress = false;

  HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);

  Handlers handlers = handlerMap.get(address);
  if (handlers == null) {
    handlers = new Handlers();
    Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
    if (prevHandlers != null) {
      handlers = prevHandlers;
    }
    newAddress = true;
  }
  handlers.list.add(holder);

  if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

  return newAddress;
}

新出现的几个类的作用:

Context 线程调度--Vert.x框架的优点是线程安全,就是通过Context实现。

HandlerHolder--对HandlerRegistration的封装,外加Context。

Handlers--上面HandlerHolder 的集合封装,外加平衡轮询逻辑。

 

handlers.list.add(holder);这句作为压轴(戏曲名词,指一场折子戏演出的倒数第二个剧目)出场完成整个功能的核心注册操作。

至于后面的那段代码,我觉得有点问题。

if (hasContext) {
    HandlerEntry entry = new HandlerEntry<>(address, registration);
    context.addCloseHook(entry);
  }

作用是在context上注册关闭事件,由DeploymentManager在unploy的时候调用,对应的核心逻辑在 CloseHooks.run()方法中。但这个这个判断条件案例只有第2次添加consumer的时候才有效果。或者是上面的代码boolean hasContext = context != null;给人的误导? 以上consumer的流程还被reply方法使用。

 

三. Send/Publish发送

多个send重载方法最后定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但这个核心方法的最终却调用了一个

名为sendOrPubInternal的方法,不由得在让人想起写程序最难的事之一是命名。正如开头说的这个使用了设计模式中的命令模式,把参数封装成MessageImpl对象发送到后面的方法。

sendOrPubInternal做了3个事情,

1.createReplyHandlerRegistration -- 有replyHandler.reply()这步才有意义

2.new SendContextImpl -- 从Context类判断,SendContextImpl可以绑定线程

3.sendContext.next(); -- 在执行方法前,执行拦截器。拦截器极大地丰富开发人员的自定义使用。

本来应该1,2,3顺序介绍代码,但是消息流程一般是:

Sender----(    message  )--->customer;

Sender<---(reply message)---customer;

根据这个流程,得先介绍2.new SendContextImpl 和3.sendContext.next();

再回头介绍 1.createReplyHandlerRegistration

 

 

先说 2.new SendContextImpl

这个类是整个Send相关类的大封装。

3.sendContext.next();

根据代码流程

sendOrPub--》deliverMessageLocally--》deliverMessageLocally

进入到deliverMessageLocally(),这个方法做了2个大事情。

  1. 获取address所对应的所有handlers
  2. 根据isSend()区分 send (平衡轮询发一个handler)/publish(遍历handlers发给所有)

方法的第一句话msg.setBus(this);和reply逻辑有关系。在这个local eventbus下,是重复赋值,没有作用的。

然后Handlers handlers = handlerMap.get(msg.address());

这句根据以address为k,取出Handlers。sender的messageImpl 终于和consumer的HandlerHold见面

 

Handler.choose()方法实现了轮询发送message, 个人认为这个方法叫做 balanceChoose()更好。

代码如下:

public HandlerHolder choose() {
  while (true) {
    int size = list.size();
    if (size == 0) {
      return null;
    }
    int p = pos.getAndIncrement();
    if (p >= size - 1) {
      pos.set(0);
    }
    try {
      return list.get(p);
    } catch (IndexOutOfBoundsException e) {
      // Can happen
      pos.set(0);
    }
  }
}

当时我使用Vert.x的时候,就很好奇eventBus的轮询功能怎么实现。现在看到其实非常简单。维护一个 AtomicInteger 的变量,每次调用累加一次。如果超过List的长度,则重置为0,方法永远返回 list.get(p)。巧妙!

最后在deliverToHandler()方法里,在Context的线程控制下,完成message和handler的最终交互。

 

那么,回到最开始的问题,

Sender----(    message  )--->Customer;

Sender<---(reply message)---Customer;

在上面的流程中,Sender根据address找到Customer从而发送message,那么Customer的reply是怎么找到Sender的呢?

答案是一个临时的replyAddress。通过以 replyAddress为key,把Sender作为handler注册到eventBusImpl上,处理后直接注销。replyAddress的规律是从1开始的步长为1的自增数列,所以开发者不应该使用纯数字作为自身业务的Address,避免冲突。

 

最后说说1.createReplyHandlerRegistration

如果sender在发送消息时使用了

send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。

vertx.eventBus().send("address1", "测试消息", ar -> {
    if (ar.succeeded()) {
        System.out.println("我是producer1:" + ar.result().body());
    }
});

并且consumer在接受消息到后,调用了 reply();

vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("consumer reply message ");
});

则会进入createReplyHandlerRegistration的处理逻辑。

使用

protected String generateReplyAddress() {
  return Long.toString(replySequence.incrementAndGet());
}

这里产生从1开始的步长为1的自增数列address。

Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
  new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);

里面的this是eventBusImpl,并在handler()方法里把 boolean replyHander的值置为true.

这样,eventBusImpl的handlerMap变量里,就多了<replyAddress, replyHander>。

在cuomser处调用reply()后,会在eventBusImpl的内部类ReplySendContextImpl<T> extends SendContextImpl 的参与下,走类似send()的流程。区别是最后在deliverToHandler()方法里,会判断boolean  replyHander的值,如果是true调用完毕就注销.

 

错误代码测验:

vertx.eventBus().consumer("1", ar -> {
    System.out.println("我不应该在这里" + ar.body());
    ar.reply("对不起,其实我是阿杜。");
});
vertx.eventBus().consumer("address1", ar -> {
    System.out.println("consumer:" + ar.body());
    ar.reply("我是高帅富");
});
vertx.eventBus().send("address1", "测试消息", ar -> {
    if (ar.succeeded()) {
        System.out.println("sender:接收收到的回应是:"+ar.result().body());
    }else{
        System.out.println("发送失败");
    }
});

存在consumer("1", ar -> {})的Console:

consumer:测试消息

我不应该在这里我是高帅富

20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024

20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096

发送失败

可以看到上面的输出完全不是设想的结果。

 

如果不存在consumer("1", ar -> {})address为1的Console:

consumer:测试消息
sender:接收收到的回应是:我是高帅富

最后,再次提醒:使用eventBus时,不要使用纯数字作为自身业务的address。

03-20 07:30