MQTT简介:

https://www.runoob.com/w3cnote/mqtt-intro.html

 

MQTT 3.1.1协议:

https://www.alibabacloud.com/help/zh/doc-detail/30540.htm?spm=a2c63.p38356.b99.43.172b5459d7wp7N

 

MQTT.fx工具连接阿里云操作参考流程:

https://blog.csdn.net/qq_45097019/article/details/107418853

 

ESP32 SDK下载地址:

https://www.espressif.com/zh-hans/support/download/sdks-demos

 

ESP32编程指南:

https://docs.espressif.com/projects/esp-idf/zh_CN/stable/index.html

 

ESP32 MQTT应用:

https://github.com/espressif/esp-mqtt

 


1. ESP32-MQTT及阿里云

MQTT ESP32连接阿里云流程及解析-LMLPHP

ESP32-MQTT的应用设计框架如上所示,主要由esp_mqtt_task和mqtt_event_handler两部分组成:

l esp_mqtt_task为一个状态机,负责MQTT会话状态的管理,及MQTT包的接收及其对应事件的发布

l mqtt_event_handler负责处理esp_mqtt_task发布的事件,完成基于MQTT协议的业务处理

 

连接阿里云需要做以下变更:

1. 修改app_main.c文件mqtt_app_start函数中mqtt_cfg变量的uri,client_id,username和password等成员变量

其中uri的字符串需要在最前面添加"mqtt://",具体值可参考开头资料的工具连接阿里云操作参考流程>,一个例子为"mqtt://$(ProductKey).iot-as-mqtt.cn-shanghai.aliyuncs.com"

2. 修改app_main.c文件mqtt_event_handler_cb函数中case MQTT_EVENT_CONNECTED的流程

主题发布可写为:esp_mqtt_client_publish(client,topic_pub_test,payload_test, 0, 0, 0);其中topic_pub_test为"/sys/$(ProductKey)/$(DeviceName)/thing/event/property/post",其中payload_test为"{\"params\":{\"CurrentTemperature\":20.3}}"

主题订阅可写为:esp_mqtt_client_subscribe(client,topic_sub_test, 0);,其中topic_sub_test为"/sys/$(ProductKey)/$(DeviceName)/thing/service/property/set"

 

2. 主要业务流程

2.1. CONNECT会话建立流程
MQTT ESP32连接阿里云流程及解析-LMLPHP
connect业务流程如上所示:

1) ESP32客户端的esp_mqtt_task发送CONNECT包至服务端进行会话连接

2) 服务端返回CONNACK

3) ESP32客户端的esp_mqtt_task接收CONNACK,解析其中的返回码确认会话连接是否建立成功,如建立成功则发送MQTT_EVENT_CONNECTED事件

4) ESP32客户端的mqtt_event_handler接收到MQTT_EVENT_CONNECTED事件并进行处理,DEMO的处理为进行主题订阅操作

 

2.2. SUBSCRIBE主题订阅流程
MQTT ESP32连接阿里云流程及解析-LMLPHP
subscribe业务流程如上所示:

1) ESP32客户端的esp_mqtt_task发送SUBSCRIBE包至服务器订阅主题,并将该SUBSCRIBE包添加至outbox等待队列上

2) 服务端返回SUBACK

3) ESP32客户端的esp_mqtt_task接收SUBACK包并将outbox等待队列上对应的项剔除,发送MQTT_EVENT_SUBSCRIBED事件

4) ESP32客户端的esp_mqtt_task如果超时未接收到SUBACK,则通过outbox等待列队重新发送SUBSCRIBE

5) ESP32客户端的mqtt_event_handler接收到MQTT_EVENT_SUBSCRIBED事件并进行处理

6) SUBSCRIBE订阅成功后当服务端的主题有信息更新时,发送PUBLISH包给客户端进行通知

 

2.3. 客户端PUBLISH QoS0主题信息推送流程
MQTT ESP32连接阿里云流程及解析-LMLPHP
客户端publish QoS0业务流程如上所示:

1) ESP32客户端发送PUBLISH包给服务端

 

2.4. 客户端PUBLISH QoS1主题信息推送流程
MQTT ESP32连接阿里云流程及解析-LMLPHP
客户端publish QoS1业务流程如上所示:

1) ESP32客户端发送PUSBLISH包给服务端,并将该PUBLISH包添加至outbox等待队列上

2) 服务端返回PUBACK

3) ESP32客户端的esp_mqtt_task接收PUBACK包并将outbox等待队列上对应的项剔除,发送MQTT_EVENT_PUBLISHED事件

4) ESP32客户端的esp_mqtt_task如果超时未接收到PUBACK,则通过outbox等待队列重新发送PUBLISH

5) ESP32客户端的mqtt_event_handler接收到MQTT_EVENT_PUBLISHED事件并进行处理

 

2.5.服务端PUBLISH QoS0主题信息推送流程
MQTT ESP32连接阿里云流程及解析-LMLPHP
服务端publish QoS0业务流程如上所示:

1) 服务端发送PUBLISH包

2) ESP32客户端的esp_mqtt_task接收PUBLISH,并发送MQTT_EVENT_DATA事件

3) ESP32客户端的mqtt_event_handler接收到MQTT_EVENT_DATA事件并进行处理

 

2.6.服务端PUBLISH QoS1主题信息推送流程

 MQTT ESP32连接阿里云流程及解析-LMLPHP
服务端publish QoS1业务流程如上所示:

1) 服务端发送PUBLISH包

2) ESP32客户端的esp_mqtt_task接收PUBLISH包并发送一个PUBACK回包,然后发送MQTT_EVENT_DATA事件

3) ESP32客户端的mqtt_event_handler接收到MQTT_EVENT_DATA事件并进行处理

4) 服务端如果未收到客户端发送的PUBACK,则会再发送同样的PUBLISH

 

3.代码流程解析

3.1.app_main

1) 通过esp_netif_init函数初始化WIFI协议栈

2) 通过esp_event_loop_create_default函数构建事件循环处理器用于WIFI协议栈的事件处理

3) 通过example_connect函数进行WIFI的网络连接

4) 通过mqtt_app_start函数开始MQTT应用

 

3.2.mqtt_app_start

1) 通过esp_mqtt_client_init函数初始化MQTT设备端

2) 通过esp_mqtt_client_register_event函数注册mqtt_event_handler至设备的事件循环处理器中

3) 通过esp_mqtt_client_start函数启动MQTT设备端

 

3.3.esp_mqtt_client_init

1) 通过esp_transport_list_init函数构建transport队列

2) 通过esp_transport_tcp_init函数构建TCP的transport节点

3) 通过esp_transport_list_add函数将TCP transport节点挂载至transport队列上,并设置其匹配的方案名为mqtt

4) 通过esp_mqtt_set_config函数进行MQTT的基础参数配置

5) 通过esp_event_loop_create函数构建设备的事件循环处理器

6) 申请设备的读缓冲区空间

7) 申请设备的写缓冲区空间

8) 通过outbox_init函数初始化设备的outbox队列

9) 通过xEventGroupCreate函数构建设备的事件组

10) 通过mqtt_msg_init函数配置设备的MQTT连接缓冲区为设备的写缓冲区

 

3.4.mqtt_event_handler

1) 通过mqtt_event_handler_cb函数进行MQTT事件的具体处理

 

3.5.mqtt_event_handler_cb

1) 进行MQTT事件的具体处理,例如在MQTT会话连接后进行主题订阅,主题发布等操作

 

3.6.esp_mqtt_client_start

1) 检查设备的状态,只有未进入过INIT状态才继续

2) 通过xTaskCreate函数启动esp_mqtt_task任务

 

3.7.esp_mqtt_task

1) 通过esp_transport_list_get_transport函数判断所使用的transport节点并返回,在前面的uri配置中我们配置的uri前缀为mqtt,transport的tcp进行注册时的关键字为mqtt,因此当前设备使用的transport使用tcp进行网络通信

2) 设置设备的状态为INIT初始化

3) 通过xEventGroupClearBits函数清空设备事件组的STOPPED_BIT位

4) 设置设备任务的运行标志位run为true,使用while循环进行任务循环处理

l MQTT_STATE_INIT状态:

1) 通过xEventGroupClearBits函数清空设备事件组的RECONNECT_BIT和DISCONNECT_BIT位

2) 通过esp_mqtt_dispatch_event_with_msgid函数发送MQTT_EVENT_BEFORE_CONNECT事件

3) 通过esp_transport_connect函数进行设备transport的连接,当前的transport为TCP则调用tcp_connect进行域名解析,socket申请与socket的connect操作

4) 通过esp_mqtt_connect函数进行MQTT协议的会话连接

5) 如会话连接成功,则通过esp_mqtt_dispatch_event_with_msgid函数发送MQTT_STATE_CONNECTED事件

l MQTT_STATE_CONNECTED状态:

1) 通过xEventGroupWaitBits函数获取事件组中的DISCONNECT_BIT位以判断是否为失连状态,是则中断设备会话的连接状态

2) 通过mqtt_process_receive函数进行MQTT包的读取及处理

3) 通过outbox_dequeue函数获取outbox队列中一个处于QUEUED状态的包,将该包通过mqtt_resend_queued函数进行发送,如果发送成功则通过outbox_set_pending函数将该outbox节点的状态更改为TRANSMITTED

4) 如果outbox队列中没有状态为QUEUED的包,则检查距离上次的发送间隔是否大于1秒,是则获取outbox队列中处于TRANSMITTED状态的包,检查当前时间是否超过包的发送时间大于1秒,是则进行重发

5) 检查会话是否达到会话的keepalive时间,如果达到并且未收到PINGRESP包,则认为会话失去连接,通过esp_mqtt_abort_connection函数重置会话进行重连,如果收到PINGRESP包则通过esp_mqtt_client_ping函数发送PINGREQ包以保持会话

6) 如果设置了会话自动刷新时间并达到目标时间则进行重连的重置以进行重连

7) outbox队列中超过30秒的项进行剔除

8) 检查outbox队列中项的总大小,如果超过限制大小则将其中状态为CONFIRMED的项进行剔除

l MQTT_STATE_WAIT_TIMEOUT状态:

1) 通过xEventGroupWaitBits函数获取RECONNECT_BIT位以等待会话的重连完成

 

3.8.esp_mqtt_connect

1) 通过mqtt_msg_connect函数构建CONENCT包

2) 通过esp_transport_write函数发送CONNECT包

3) 通过mqtt_message_receive函数接收一个MQTT包

4) 判断接收的MQTT包是否为CONNACK包,不是则异常返回

5) 通过mqtt_get_connect_return_code函数获取CONNACK包的返回类型

6) 如果CONNACK包的返回类型不为ACCEPTED,则发送一个MQTT_EVENT_ERROR事件并返回异常

 

3.9.mqtt_message_receive

1) 如果已读取的字节数为0,则读取一个字节用于读取固定包头的header字节,并解析header字节是否为有效的格式,不是则返回异常

2) 检查是否已读取header字节,并且剩余字节未读取完毕,剩余字节读取完毕通过检查剩余字节的字节数以及剩余字节的最高位是否为1进行判断

3) 通过剩余字节的内容计算可变包头加负载的数据大小

4) 检查可变包头加负载的数据大小是否超过读缓冲区空间的大小,如超过则判断包是否为PUBLISH包,不是则认为包的数据大小异常,返回异常,是PUBLISH包读取PUBLISH包的主题字符串长度部分(2字节),获取主题字符串总长度后判断读缓冲区的长度是否小于PUBLISH包除负载外的数据总长度,是则返回异常,否则设置数据总长度为读缓冲区的长度,只读取负载的部分内容

5) 检查已读取的数据长度是否小于总数据长度,小于则读取剩余数据至读缓冲区中返回0,大于或者等于则MQTT包读取完成,返回1

 

3.10.mqtt_process_receive

1) 通过mqtt_message_receive函数读取1个MQTT包

2) 判断收到的MQTT包类型

收到SUBACK包表示订阅成功,发布一个MQTT_EVENT_SUBSCRIBED事件

收到PUBLISH包表示订阅的主题收到数据,通过deliver_publish函数进行处理并发布MQTT_EVENT_DATA事件,并根据QoS的设置发送ACK包

收到PUBACK包表示设备发送的Qos大于0的PUBLISH包服务器已收到,将对应的outbox中的项状态设置为CONFIRMED,并发布MQTT_EVENT_PUBLISHED事件

收到PINGRESP包说明服务器收到设备发送的PINGREQ包,该会话连接为正常,设置等待PINGRESP包标志位为true

 

3.11.esp_mqtt_client_publish

1) 通过mqtt_msg_publish函数构建PUBLISH包

2) 如果Qos大于0,则通过mqtt_enqueue函数将该PUBLISH包保存至outbox队列中

3) 通过mqtt_write_data函数发送该PUBLISH包

4) 如果QoS大于0,则设置对应的outbox项时间为当前时间,状态为TRANSMITTED

 

3.12.esp_mqtt_client_subscribe

1) 通过mqtt_msg_subscribe函数构建SUBSCRIBE包

2) 通过mqtt_enqueue将该SUBSCRIBE包添加至outbox队列上

3) 设置对应的outbox项状态为TRANSMITTED

4) 通过transport接口发送SUBSCRIBE包

 

 

 

09-28 10:10