自研EventBus通信组件

研发背景

Flux1.0版本采用Ignite的message API进行数据路由,此方式主要有以下几个缺点:

EventBus特性

集群通信

基于TCP组件(后面拓展UDP组播等功能),实现集群间启动相互连接,服务端实现端口占用扫描启动:默认48880端口、如果端口占用则依次递增,最大端口号为:49000。节点启动后,客户端根据配置文件配置的集群IP自动进行端口扫描连接(48880->49000),同时客户端维护与服务端的心跳。避免节点宕机。

报文帧

批量压缩

分布式发布订阅匹配树

FluxMQ集群节点间维护一个Root级别的订阅,订阅会分为2种:

数据管理

会话消息

提供分布式会话消息,会话期间Session消息持久化存储,集群宕机后重启,数据不丢失,重启集群后数据重新加载

保留消息

根据Topic保留消息,每个TOPIC仅保留一条,当传输的MQTT payload为空时,则清空保留消息。数据持久化存储,重启集群后,数据重新加载

延迟消息

延迟Topic格式:

$DELAY/延迟秒指/TOPIC

配置持久化

基于Ignite的实现配置数据区持久化,目前持久化的数据内容有以下:

规则引擎

LOG文件打印

数据库SQL模板支持Json函数'

数据输入:

{
    "msg": {
      "id":"id",
      "body":{
        "state":1,
        "no":2
      }
    },
    "messageId": 1,
    "topic""test",
    "qos": 1,
    "retain"false,
    "time""2022 12-22 12:00:00",
    "clientId""A1212313"
}

此时我只想插入msg内容下的body结构体,以下是一个通用的插入SQL语句模板:

insert into table (clientId,topic,msg) values ('${clientId}','${topic}','${json(msg.body)}')

通过json(变量名) 方式给结构体转成json字符串替换成插入字段的值

多协议模块

目前FluxMQ内置了COAP、WEBSOCKET、I1协议的组件,可以指定端口启动,启动后,可以通过MQTT与协议组件之间交互。每个客户端必须按照FluxMQ的标准进行接入。扩展协议与FluxMQ的MQTT共享以下组件:

上行指令

select * from "$EVENT.EXTENSION"

传输的数据格式如下:

{
    "protocol""I1",
    "cmd""PUBLISH",
    "messageId": 0,
    "time""2023-07-11 21:59:23",
    "clientId""clientId",
    "nodeIp""127.0.0.1",
    "clientIp""127.0.0.1:19999",
    "body""body"
}

新增一个转发WEBSOCKET协议的报文

SQL如下:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET'"

新增一个转发WEBSOCKET 上报协议的报文

SQL如下:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET' AND cmd ='PUBLISH'

下行指令

通过MQTT客户端下发FluxMQ集群指令,即可将指令写给扩展协议客户端,格式如下:

$PROTOCOL/协议名称/{clientId}

连接管理

启动WEBSOCKET协议插件

WEBSOCKET客户端连接

ws://123.249.9.130:7777/test

连接管理

07-12 06:54