【Ai-WB2高级篇】MQTT 协议连接

[复制链接]
查看411 | 回复5 | 5 天前 | 显示全部楼层 |阅读模式
本帖最后由 lovzx 于 2024-9-14 12:01 编辑

【Ai-WB2高级篇】MQTT 协议连接



MQTT
MQTT(Message Queuing Telemetry Transport)是一种轻量级适用于资源受限的设备下使用,特别是嵌入式领域

MQTT特点
  • 轻量级:MQTT开销低、报文小非常适合资源受限的设备
  • 可靠:MQTT支持多种QoS等级、会话感知和持久连接
  • 安全通信: 支持TLS和SSL加密功能,还可以通过用户名/密码方式进行认证
  • 双向通信: MQTT基于**发布-订阅**模式为设备之间提供了无缝双向通信,既可以发布消息也可以订阅消息,无须设备之间相互耦合,易于扩展
  • 大规模物联网设备支持:MQTT轻量级、低带宽消耗,通过发布-订阅模式解耦设备与设备之间的关系,从而有效的网络流量和资源的使用
  • 语言支持: 编程语言支持多

MQTT基本组件
发布订阅模式: 为了解决物联网设备之间的解耦问题,MQTT采用了发布-订阅模式,与传统的客户端-服务端的模式不同,MQTT发布者和订阅者无须知道对方的存在,无须建立直接的连接,通过MQTT Broker来负责消息的路由和分发
mqtt消息分发.png
MQTT设备将温度数据发布到指定的Temperature主题,MQTT Broker收到消息后会将改数据转发给该主题的订阅者

主题topic
MQTT通过topic对发布和订阅的客户端进行管理,类似于分组,发布者可以发送订阅topic的所有组内成员,topic通过/来分割,支持+和#两种通配符
  • +: 表示单层匹配,例如:/a/+,匹配/a/b,和a/c
  • #: 表示多层匹配,例如:a/#,匹配/a/x,/a/b/c

需要注意的是通配符只能用于订阅,不能用于发布

服务质量 QoS
为了适应不同的网络环境,MQTT提供了三种服务质量等级
  • QoS 0: 消息最多传送一次。如果客户端不可用,将丢失这条消息
  • QoS 1:消息至少传送一次,保证可以收到消息,但可能会重复
  • QoS 2: 消息只传递一次,保证即不丢失也不重复

QoS等级从低到高消息可靠性增高,传输复杂度也提升

MQTT工作流程
客户端通过TCP/IP协议与MQTT Broker建立连接,可以选择TLS/SSL加密来实现,建立连接后既可以发布指定的主题也可以定于指定的主题,客户端发布时消息需要指定topic和QoS,Broker收到消息后会根据不同的QoS等级按照不通过的策略将消息分发给该主题的订阅者们

MQTT客户端
MQTTX是一款免费开源的全平台客户端,完全兼容MQTT协议MQTT官网

免费公共MQTT服务器
MQTT免费服务器
服务器信息
  • 地址:broker.emqx.io
  • mqtt端口:1883
  • ws端口:8083
  • mqtts端口:8883
  • wss端口:8084

证书文件可以从官方手动下载
mqttx设置连接.png

连接成功后可以设置订阅内容或者发送消息
mqttx发送消息.png

代码编写
MQTT需要用到lwip和网络相关的组件,需要在Makefile配置文件中添加相关的依赖
  1. COMPONENTS_BLSYS := bltime blfdt blmtd bl_os_adapter rfparam_adapter_tmp bl602_os_adapter bl602 bl602_std
  2. COMPONENTS_LOOPS := bloop loopadc looport yloop loopset looprt
  3. COMPONENTS_VFS := romfs
  4. COMPONENTS_NETWORK := sntp dns_server lwip lwip_dhcpd
  5. COMPONENTS_MQTT := axk_common http-parser axk_tls axk_mqtt wpa_supplicant tcp_transport blcrypto_suite
  6. COMPONENTS_WIIF := wifi wifi_manager wifi_hosal

  7. INCLUDE_COMPONENTS += freertos_riscv_ram newlibc
  8. INCLUDE_COMPONENTS += mbedtls_lts vfs hosal coredump blog
  9. INCLUDE_COMPONENTS += utils cli cjson

  10. INCLUDE_COMPONENTS += $(COMPONENTS_LOOPS)
  11. INCLUDE_COMPONENTS += $(COMPONENTS_BLSYS)
  12. INCLUDE_COMPONENTS += $(COMPONENTS_VFS)
  13. INCLUDE_COMPONENTS += $(COMPONENTS_NETWORK)
  14. INCLUDE_COMPONENTS += $(COMPONENTS_WIIF)
  15. INCLUDE_COMPONENTS += $(COMPONENTS_MQTT)

  16. INCLUDE_COMPONENTS += $(PROJECT_NAME)
复制代码

在获取到IP地址的地方开始mqtt的连接,使用的是sdk里面提供的axk_mqtt组件,其配置连接代码如下
  
  1. axk_mqtt_client_config_t mqtt_cfg = {
  2.         .uri = "ws://broker.emqx.io:8083/mqtt",
  3.         .event_handle = event_cb,

  4.         //指定客户端名称
  5.         .client_id = "axk mqtt",
  6.         //设置服务器证书文件
  7.         // .cert_pem = CA_CRT,
  8.         // .cert_len = strlen(CA_CRT) + 1,
  9.         // .username = "wb2",
  10.         // .password = "wb2",
  11.         
  12.         //客户端证书
  13.         // .client_cert_pem = CLIENT_CA,
  14.         // .client_key_pem = CLIENT_KEY,

  15.         
  16.     };

  17.     axk_mqtt_client_handle_t client = axk_mqtt_client_init(&mqtt_cfg);
  18.     axk_mqtt_client_start(client);
复制代码

可以通过event->event_id来判断消息的类型,常用的的类型有一下几种
  • MQTT_EVENT_CONNECTED:连接到mqtt服务器
  • MQTT_EVENT_DISCONNECTED:断开连接
  • MQTT_EVENT_SUBSCRIBED:订阅成功
  • MQTT_EVENT_PUBLISHED:发送成功
  • MQTT_EVENT_DATA:接收到消息
MQTT_EVENT_DATA用来接收消息,通过event->topic来判断是什么类型的消息,从而做出对应的操作,event->data是消息的内容

常用的函数

  1. //初始化mqtt配置,并返回一个client对象
  2. axk_mqtt_client_handle_t axk_mqtt_client_init(const axk_mqtt_client_config_t *config);

  3. //重新连接
  4. axk_err_t axk_mqtt_client_reconnect(axk_mqtt_client_handle_t client);

  5. //断开连接
  6. axk_err_t axk_mqtt_client_disconnect(axk_mqtt_client_handle_t client);

  7. //设置client连接地址
  8. axk_err_t axk_mqtt_client_set_uri(axk_mqtt_client_handle_t client, const char *uri);

  9. //开始启动mqtt client
  10. axk_err_t axk_mqtt_client_start(axk_mqtt_client_handle_t client);

  11. //设置client配置
  12. axk_err_t axk_mqtt_set_config(axk_mqtt_client_handle_t client, const axk_mqtt_client_config_t *config);

  13. //订阅主题
  14. int axk_mqtt_client_subscribe(axk_mqtt_client_handle_t client, const char *topic, int qos);

  15. //取消订阅主题
  16. int axk_mqtt_client_unsubscribe(axk_mqtt_client_handle_t client, const char *topic);

  17. //发送主题消息
  18. int axk_mqtt_client_publish(axk_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);

  19. //注册回调事件
  20. axk_err_t axk_mqtt_client_register_event(axk_mqtt_client_handle_t client, axk_mqtt_event_id_t event, axk_event_handler_t event_handler, void *event_handler_arg);
复制代码

案例
通过mqtt协议来控制wb2板载led灯,首先创建wb2_led.h,代码如下
  1. #ifndef _WB2_LED_H
  2. #define _WB2_LED_H

  3. #include <stdint.h>

  4. #define PIN_R   14
  5. #define PIN_G   17
  6. #define PIN_B   3

  7. #ifdef __cplusplus
  8. extern "C" {
  9. #endif

  10.     /**
  11.      * 初始化led
  12.      * @param  pin
  13.      */
  14.     void wb2_led_init(uint8_t pin);
  15.     /**
  16.      * 打开指定颜色的灯
  17.      * @param  led     r、g、b
  18.      */
  19.     void wb2_led_open(const char* led);
  20.     /**
  21.      * 关闭指定颜色的灯
  22.      * @param  led     r、g、b
  23.      */
  24.     void wb2_led_close(const char* led);

  25.     /**
  26.      * 获取指定灯的状态
  27.      * @param  led     r、g、b
  28.      * @return int 0 close,1open
  29.      */
  30.     uint8_t wb2_led_status(const char* led);

  31. #ifdef __cplusplus
  32. }
  33. #endif

  34. #endif
复制代码
创建wb2_led.c,代码实现如下
  1. #include "wb2_led.h"
  2. #include <bl_gpio.h>
  3. #include <string.h>
  4. #include <stdint.h>
  5. #include <bl602.h>
  6. #include <glb_reg.h>
  7. #include <stdio.h>

  8. /**
  9. * bit0: b
  10. * bit1: g
  11. * bit2: r
  12. */
  13. static uint8_t rgb = 0;


  14. void update_status(uint8_t pin, uint8_t data)
  15. {
  16.     int mask = 0;
  17.     switch (pin)
  18.     {
  19.         case PIN_R:
  20.             mask = 2;
  21.             break;
  22.         case PIN_G:
  23.             mask = 1;
  24.             break;
  25.         case PIN_B:
  26.             mask = 0;
  27.             break;
  28.     }
  29.     if (data) {
  30.         rgb |= 1 << mask;
  31.     }
  32.     else {
  33.         rgb &= ~(1 << mask);
  34.     }
  35. }

  36. static int get_pin(const char* led)
  37. {
  38.     int pin = -1;
  39.     if (strcmp(led, "r") == 0) {
  40.         pin = PIN_R;
  41.     }
  42.     else if (strcmp(led, "g") == 0) {
  43.         pin = PIN_G;
  44.     }
  45.     else if (strcmp(led, "b") == 0) {
  46.         pin = PIN_B;
  47.     }
  48.     return pin;
  49. }

  50. /**
  51. * 打开指定颜色的灯
  52. * @param  led     r、g、b
  53. */
  54. void wb2_led_open(const char* led)
  55. {
  56.     if (get_pin(led) != -1) {
  57.         update_status(get_pin(led), 1);
  58.         bl_gpio_output_set(get_pin(led), 1);
  59.     }

  60. }
  61. /**
  62. * 关闭指定颜色的灯
  63. * @param  led     r、g、b
  64. */
  65. void wb2_led_close(const char* led)
  66. {
  67.     if (get_pin(led) != -1) {
  68.         update_status(get_pin(led), 0);
  69.         bl_gpio_output_set(get_pin(led), 0);
  70.     }
  71. }

  72. /**
  73. * 获取指定灯的状态
  74. * @param  led     r、g、b
  75. * @return int 0 close,1open
  76. */
  77. uint8_t wb2_led_status(const char* led)
  78. {
  79.     int mask = 0;
  80.     switch (get_pin(led))
  81.     {
  82.         case PIN_R:
  83.             mask = 2;
  84.             break;
  85.         case PIN_G:
  86.             mask = 1;
  87.             break;
  88.         case PIN_B:
  89.             mask = 0;
  90.             break;
  91.     }
  92.     return (rgb >> mask) & 0x1;
  93. }

  94. void wb2_led_init(uint8_t pin)
  95. {
  96.     bl_gpio_enable_output(pin, 0, 1);
  97.     bl_gpio_output_set(pin, 0);
  98. }
复制代码

main函数中调用wb2_led_init初始化led,并在mqtt连接后订阅/on/led/wb2、/off/led/wb2、以及/get/led/wb2 三个主题,然后再收到消息后去处理消息的内容,消息内容
例如:
发送主题/on/led/wb2的内容为r表示打开wb2板载的r led灯
发送主题/off/led/wb2的内容为r表示关闭wb2板载的r led灯
发送主题/get/led/wb2的内容为r表示获取wb2板载的r led灯的状态,并发送到/status/led/wb2主题,1为打开,0为关闭
main中代码如下
  1. #define TOPIC_ON "/on/led/wb2"
  2. #define TOPIC_OFF "/off/led/wb2"
  3. #define TOPIC_GET "/get/led/wb2"
  4. #define TOPIC_STATUS "/status/led/wb2"

  5. /**
  6. * 根据不同的tpioc处理事件
  7. * @param  data
  8. * @param  topic
  9. */
  10. void resolve_led_message(const char* data, const char* topic, axk_mqtt_client_handle_t client) {

  11.     //打开灯
  12.     if (strcmp(topic, TOPIC_ON) == 0) {
  13.         wb2_led_open(data);
  14.     }
  15.     //关闭灯
  16.     else if (strcmp(TOPIC_OFF, topic) == 0) {
  17.         wb2_led_close(data);
  18.     }
  19.     //发送灯的状态
  20.     else if (strcmp(TOPIC_GET, topic) == 0) {
  21.         axk_mqtt_client_publish(client, wb2_led_status(data), TOPIC_STATUS, 1, 0, 0);
  22.     }

  23. }
复制代码
mqtt_data_task中创建一个queue队列接收消息并处理

  1. /**
  2. * 循环队列接收mqtt数据
  3. * @param  argv            
  4. */
  5. void mqtt_data_task(void* argv)
  6. {
  7.     //创建queue
  8.     mqtt_data_queue = xQueueCreate(10, sizeof(mqtt_data_t));
  9.     mqtt_data_t data;
  10.     while (pdTRUE == xQueueReceive(mqtt_data_queue, &data, portMAX_DELAY))
  11.     {
  12.         //接收处理消息
  13.         resolve_led_message(data.data, data.topic, client);
  14.     }
  15. }
复制代码

mqtt订阅接收消息
  1. switch ((axk_mqtt_event_id_t)event_id) {
  2.         case MQTT_EVENT_CONNECTED:
  3.             //订阅ON事件
  4.             msg_id = axk_mqtt_client_subscribe(client, TOPIC_ON, 0);
  5.             blog_info("sent subscribe successful, msg_id=%d", msg_id);

  6.             //订阅OFF事件
  7.             msg_id = axk_mqtt_client_subscribe(client, TOPIC_OFF, 0);
  8.             blog_info("sent subscribe successful, msg_id=%d", msg_id);

  9.             //订阅GET事件
  10.             msg_id = axk_mqtt_client_subscribe(client, TOPIC_GET, 0);
  11.             blog_info("sent subscribe successful, msg_id=%d", msg_id);
  12.             break;
  13.         case MQTT_EVENT_DATA:
  14.             //event->data后面没有'\0'分割,所以需要手动处理
  15.             char* data = pvPortMalloc(sizeof(char) * event->data_len + 1);
  16.             char* topic = pvPortMalloc(sizeof(char) * event->topic_len + 1);
  17.             mqtt_data_t* mqdata = (mqtt_data_t*)pvPortMalloc(sizeof(mqtt_data_t));

  18.             mqdata->data = data;
  19.             mqdata->topic = topic;

  20.             //复制data和topic
  21.             memset(data, '\0', event->data_len + 1);
  22.             memset(topic, '\0', event->topic_len + 1);
  23.             memcpy(data, event->data, event->data_len);
  24.             memcpy(topic, event->topic, event->topic_len);

  25.             //向queue中发送消息
  26.             xQueueSend(mqtt_data_queue, mqdata, 0);
  27.             
  28.             break;
  29.     }
复制代码
实验结果.png

  • 通过mqttx客户端设置并连接到mqtt服务器
  • 添加/status/led/wb2订阅
  • 发送/on/led/wb2主题消息,内容为g,可以看到开发板绿灯亮了
  • 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据1
  • 发送/off/led/wb2主题消息,内容为g,可以看到开发板绿灯灭了
  • 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据0

限于本人能力有限难免有错误或遗漏的地方请多多包涵
具体的代码上传到gitee代码仓库


回复

使用道具 举报

lovzx | 5 天前 | 显示全部楼层

<iframe src="https://player.bilibili.com/player.html?isOutside=true&aid=113133834213894&bvid=BV1YqtNecEwe&cid=25652893931&p=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"></iframe>

视频地址

回复 支持 反对

使用道具 举报

学习学习
选择去发光,而不是被照亮
回复

使用道具 举报

爱笑 | 5 天前 | 显示全部楼层
辛苦啦~
用心做好保姆工作
回复

使用道具 举报

WT_0213 | 5 天前 | 显示全部楼层
学习学习
回复

使用道具 举报

qhsj | 5 天前 | 显示全部楼层
学习了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则