本帖最后由 lovzx 于 2025-1-7 15:59 编辑
【Ai-WB2高级篇】MQTT 协议连接
MQTT
MQTT(Message Queuing Telemetry Transport)是一种轻量级适用于资源受限的设备下使用,特别是嵌入式领域
MQTT特点
轻量级: MQTT开销低、报文小非常适合资源受限的设备 可靠: MQTT支持多种QoS等级、会话感知和持久连接 安全通信: 支持TLS和SSL加密功能,还可以通过用户名/密码方式进行认证 双向通信: MQTT基于**发布-订阅**模式为设备之间提供了无缝双向通信,既可以发布消息也可以订阅消息,无须设备之间相互耦合,易于扩展 大规模物联网设备支持: MQTT轻量级、低带宽消耗,通过发布-订阅模式解耦设备与设备之间的关系,从而有效的网络流量和资源的使用 语言支持: 编程语言支持多
MQTT基本组件
发布订阅模式: 为了解决物联网设备之间的解耦问题,MQTT采用了发布-订阅模式,与传统的客户端-服务端的模式不同,MQTT发布者和订阅者无须知道对方的存在,无须建立直接的连接,通过MQTT Broker来负责消息的路由和分发
MQTT设备将温度数据发布到指定的Temperature主题,MQTT Broker收到消息后会将改数据转发给该主题的订阅者
主题topic
MQTT通过topic对发布和订阅的客户端进行管理,类似于分组,发布者可以发送订阅topic的所有组内成员,topic通过/来分割,支持+和#两种通配符
+: 表示单层匹配,例如:/a/+,匹配/a/b,和a/c #: 表示多层匹配,例如:a/#,匹配/a/x,/a/b/c
需要注意的是通配符只能用于订阅,不能用于发布
服务质量 QoS
为了适应不同的网络环境,MQTT提供了三种服务质量等级
QoS 0: 消息最多传送一次。如果客户端不可用,将丢失这条消息 QoS 1: 消息至少传送一次,保证可以收到消息,但可能会重复 QoS 2: 消息只传递一次,保证即不丢失也不重复
QoS等级从低到高消息可靠性增高,传输复杂度也提升
MQTT工作流程
客户端通过TCP/IP协议与MQTT Broker建立连接,可以选择TLS/SSL加密来实现,建立连接后既可以发布指定的主题也可以定于指定的主题,客户端发布时消息需要指定topic和QoS,Broker收到消息后会根据不同的QoS等级按照不通过的策略将消息分发给该主题的订阅者们
MQTT客户端
MQTTX是一款免费开源的全平台客户端,完全兼容MQTT协议 MQTT官网
免费公共MQTT服务器
MQTT免费服务器
服务器信息
地址: broker.emqx.iomqtt端口: 1883ws端口: 8083mqtts端口: 8883wss端口: 8084
证书文件可以从官方手动下载
连接成功后可以设置订阅内容或者发送消息
代码编写
MQTT需要用到lwip和网络相关的组件,需要在Makefile配置文件中添加相关的依赖
COMPONENTS_BLSYS := bltime blfdt blmtd bl_os_adapter rfparam_adapter_tmp bl602_os_adapter bl602 bl602_std
COMPONENTS_LOOPS := bloop loopadc looport yloop loopset looprt
COMPONENTS_VFS := romfs
COMPONENTS_NETWORK := sntp dns_server lwip lwip_dhcpd
COMPONENTS_MQTT := axk_common http-parser axk_tls axk_mqtt wpa_supplicant tcp_transport blcrypto_suite
COMPONENTS_WIIF := wifi wifi_manager wifi_hosal
INCLUDE_COMPONENTS += freertos_riscv_ram newlibc
INCLUDE_COMPONENTS += mbedtls_lts vfs hosal coredump blog
INCLUDE_COMPONENTS += utils cli cjson
INCLUDE_COMPONENTS += $(COMPONENTS_LOOPS)
INCLUDE_COMPONENTS += $(COMPONENTS_BLSYS)
INCLUDE_COMPONENTS += $(COMPONENTS_VFS)
INCLUDE_COMPONENTS += $(COMPONENTS_NETWORK)
INCLUDE_COMPONENTS += $(COMPONENTS_WIIF)
INCLUDE_COMPONENTS += $(COMPONENTS_MQTT)
INCLUDE_COMPONENTS += $(PROJECT_NAME) 复制代码
在获取到IP地址的地方开始mqtt的连接,使用的是sdk里面提供的axk_mqtt组件,其配置连接代码如下
axk_mqtt_client_config_t mqtt_cfg = {
.uri = "ws://broker.emqx.io:8083/mqtt",
.event_handle = event_cb,
//指定客户端名称
.client_id = "axk mqtt",
//设置服务器证书文件
// .cert_pem = CA_CRT,
// .cert_len = strlen(CA_CRT) + 1,
// .username = "wb2",
// .password = "wb2",
//客户端证书
// .client_cert_pem = CLIENT_CA,
// .client_key_pem = CLIENT_KEY,
};
axk_mqtt_client_handle_t client = axk_mqtt_client_init(&mqtt_cfg);
axk_mqtt_client_start(client);
复制代码
可以通过event->event_id来判断消息的类型,常用的的类型有一下几种
MQTT_EVENT_CONNECTED: 连接到mqtt服务器MQTT_EVENT_DISCONNECTED: 断开连接MQTT_EVENT_SUBSCRIBED: 订阅成功MQTT_EVENT_PUBLISHED: 发送成功MQTT_EVENT_DATA: 接收到消息
MQTT_EVENT_DATA用来接收消息,通过event->topic来判断是什么类型的消息,从而做出对应的操作,event->data是消息的内容
常用的函数
//初始化mqtt配置,并返回一个client对象
axk_mqtt_client_handle_t axk_mqtt_client_init(const axk_mqtt_client_config_t *config);
//重新连接
axk_err_t axk_mqtt_client_reconnect(axk_mqtt_client_handle_t client);
//断开连接
axk_err_t axk_mqtt_client_disconnect(axk_mqtt_client_handle_t client);
//设置client连接地址
axk_err_t axk_mqtt_client_set_uri(axk_mqtt_client_handle_t client, const char *uri);
//开始启动mqtt client
axk_err_t axk_mqtt_client_start(axk_mqtt_client_handle_t client);
//设置client配置
axk_err_t axk_mqtt_set_config(axk_mqtt_client_handle_t client, const axk_mqtt_client_config_t *config);
//订阅主题
int axk_mqtt_client_subscribe(axk_mqtt_client_handle_t client, const char *topic, int qos);
//取消订阅主题
int axk_mqtt_client_unsubscribe(axk_mqtt_client_handle_t client, const char *topic);
//发送主题消息
int axk_mqtt_client_publish(axk_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
//注册回调事件
axk_err_t axk_mqtt_client_register_event(axk_mqtt_client_handle_t client, axk_mqtt_event_id_t event, axk_event_handler_t event_handler, void *event_handler_arg); 复制代码
案例
通过mqtt协议来控制wb2板载led灯,首先创建wb2_led.h,代码如下
#ifndef _WB2_LED_H
#define _WB2_LED_H
#include <stdint.h>
#define PIN_R 14
#define PIN_G 17
#define PIN_B 3
#ifdef __cplusplus
extern "C" {
#endif
/**
* 初始化led
* @param pin
*/
void wb2_led_init(uint8_t pin);
/**
* 打开指定颜色的灯
* @param led r、g、b
*/
void wb2_led_open(const char* led);
/**
* 关闭指定颜色的灯
* @param led r、g、b
*/
void wb2_led_close(const char* led);
/**
* 获取指定灯的状态
* @param led r、g、b
* @return int 0 close,1open
*/
uint8_t wb2_led_status(const char* led);
#ifdef __cplusplus
}
#endif
#endif 复制代码 创建wb2_led.c,代码实现如下
#include "wb2_led.h"
#include <bl_gpio.h>
#include <string.h>
#include <stdint.h>
#include <bl602.h>
#include <glb_reg.h>
#include <stdio.h>
/**
* bit0: b
* bit1: g
* bit2: r
*/
static uint8_t rgb = 0;
void update_status(uint8_t pin, uint8_t data)
{
int mask = 0;
switch (pin)
{
case PIN_R:
mask = 2;
break;
case PIN_G:
mask = 1;
break;
case PIN_B:
mask = 0;
break;
}
if (data) {
rgb |= 1 << mask;
}
else {
rgb &= ~(1 << mask);
}
}
static int get_pin(const char* led)
{
int pin = -1;
if (strcmp(led, "r") == 0) {
pin = PIN_R;
}
else if (strcmp(led, "g") == 0) {
pin = PIN_G;
}
else if (strcmp(led, "b") == 0) {
pin = PIN_B;
}
return pin;
}
/**
* 打开指定颜色的灯
* @param led r、g、b
*/
void wb2_led_open(const char* led)
{
if (get_pin(led) != -1) {
update_status(get_pin(led), 1);
bl_gpio_output_set(get_pin(led), 1);
}
}
/**
* 关闭指定颜色的灯
* @param led r、g、b
*/
void wb2_led_close(const char* led)
{
if (get_pin(led) != -1) {
update_status(get_pin(led), 0);
bl_gpio_output_set(get_pin(led), 0);
}
}
/**
* 获取指定灯的状态
* @param led r、g、b
* @return int 0 close,1open
*/
uint8_t wb2_led_status(const char* led)
{
int mask = 0;
switch (get_pin(led))
{
case PIN_R:
mask = 2;
break;
case PIN_G:
mask = 1;
break;
case PIN_B:
mask = 0;
break;
}
return (rgb >> mask) & 0x1;
}
void wb2_led_init(uint8_t pin)
{
bl_gpio_enable_output(pin, 0, 1);
bl_gpio_output_set(pin, 0);
} 复制代码
main函数中调用wb2_led_init初始化led,并在mqtt连接后订阅/on/led/wb2、/off/led/wb2、以及/get/led/wb2 三个主题,然后再收到消息后去处理消息的内容,消息内容
例如:
发送主题/on/led/wb2的内容为r表示打开wb2板载的r led灯
发送主题/off/led/wb2的内容为r表示关闭wb2板载的r led灯
发送主题/get/led/wb2的内容为r表示获取wb2板载的r led灯的状态,并发送到/status/led/wb2主题,1为打开,0为关闭
main中代码如下
#define TOPIC_ON "/on/led/wb2"
#define TOPIC_OFF "/off/led/wb2"
#define TOPIC_GET "/get/led/wb2"
#define TOPIC_STATUS "/status/led/wb2"
/**
* 根据不同的tpioc处理事件
* @param data
* @param topic
*/
void resolve_led_message(const char* data, const char* topic, axk_mqtt_client_handle_t client) {
//打开灯
if (strcmp(topic, TOPIC_ON) == 0) {
wb2_led_open(data);
}
//关闭灯
else if (strcmp(TOPIC_OFF, topic) == 0) {
wb2_led_close(data);
}
//发送灯的状态
else if (strcmp(TOPIC_GET, topic) == 0) {
axk_mqtt_client_publish(client, wb2_led_status(data), TOPIC_STATUS, 1, 0, 0);
}
} 复制代码 mqtt_data_task中创建一个queue队列接收消息并处理
/**
* 循环队列接收mqtt数据
* @param argv
*/
void mqtt_data_task(void* argv)
{
//创建queue
mqtt_data_queue = xQueueCreate(10, sizeof(mqtt_data_t));
mqtt_data_t data;
while (pdTRUE == xQueueReceive(mqtt_data_queue, &data, portMAX_DELAY))
{
//接收处理消息
resolve_led_message(data.data, data.topic, client);
}
} 复制代码
mqtt订阅接收消息
switch ((axk_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
//订阅ON事件
msg_id = axk_mqtt_client_subscribe(client, TOPIC_ON, 0);
blog_info("sent subscribe successful, msg_id=%d", msg_id);
//订阅OFF事件
msg_id = axk_mqtt_client_subscribe(client, TOPIC_OFF, 0);
blog_info("sent subscribe successful, msg_id=%d", msg_id);
//订阅GET事件
msg_id = axk_mqtt_client_subscribe(client, TOPIC_GET, 0);
blog_info("sent subscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DATA:
//event->data后面没有'\0'分割,所以需要手动处理
char* data = pvPortMalloc(sizeof(char) * event->data_len + 1);
char* topic = pvPortMalloc(sizeof(char) * event->topic_len + 1);
mqtt_data_t* mqdata = (mqtt_data_t*)pvPortMalloc(sizeof(mqtt_data_t));
mqdata->data = data;
mqdata->topic = topic;
//复制data和topic
memset(data, '\0', event->data_len + 1);
memset(topic, '\0', event->topic_len + 1);
memcpy(data, event->data, event->data_len);
memcpy(topic, event->topic, event->topic_len);
//向queue中发送消息
xQueueSend(mqtt_data_queue, mqdata, 0);
break;
} 复制代码
通过mqttx客户端设置并连接到mqtt服务器 添加/status/led/wb2订阅 发送/on/led/wb2主题消息,内容为g,可以看到开发板绿灯亮了 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据1 发送/off/led/wb2主题消息,内容为g,可以看到开发板绿灯灭了 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据0
限于本人能力有限难免有错误或遗漏的地方请多多包涵
具体的代码上传到gitee代码仓库
工程下载下来放到SDK/applications/get-started/目录下,或者是随便放个位置,配置环境变量设置BL60X_SDK_PATH的值为SDK路径就好