1. 前言
随着物联网(IoT)的迅猛发展,海量设备需要高效、可靠、低功耗地进行数据交换。传统的HTTP协议虽然广泛使用,但其“请求-响应”模式在资源受限、网络不稳定的物联网环境中显得力不从心。这时,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)应运而生,成为物联网领域事实上的标准通信协议。
本文将带你全面了解MQTT协议的核心概念、工作原理、特点及实际应用,并通过C/C++ 代码示例快速上手。
2. 什么是MQTT?
MQTT是一种基于发布/订阅(Publish/Subscribe)模式的轻量级消息传输协议,由IBM于1999年开发。它专为低带宽、高延迟、网络不可靠的环境设计,非常适合传感器、移动设备等资源受限的场景。
MQTT运行在TCP/IP协议栈之上,默认端口为1883(加密版使用8883)。目前MQTT已经成为OASIS标准,版本有v3.1.1和v5.0。
3. MQTT的核心概念
MQTT协议中有三个核心角色:发布者、订阅者和代理(Broker)。
| 角色 | 说明 |
|---|---|
| 发布者(Publisher) | 发送消息的客户端,将数据发送到特定的“主题” |
| 订阅者(Subscriber) | 接收消息的客户端,订阅感兴趣的主题 |
| 代理(Broker) | 消息服务器,负责接收发布者的消息并转发给所有订阅了该主题的客户端 |
发布者和订阅者之间解耦——它们不需要知道彼此的存在,也不需要同时在线。Broker负责所有消息的路由和存储(如果需要)。
3.1 主题(Topic)
主题是UTF-8字符串,通过斜杠/分层,例如:
sensor/temperaturehome/livingroom/lightdevice/123/status
主题支持通配符:
+:单层通配符,例如sensor/+/temperature匹配sensor/room1/temperature#:多层通配符,例如sensor/#匹配sensor/room1/temperature和sensor/room2/humidity
4. MQTT的工作流程
客户端(发布者或订阅者)连接到Broker。
订阅者向Broker订阅一个或多个主题。
发布者向Broker发送一个指定主题的消息。
Broker查找订阅了该主题的所有客户端,并将消息推送给它们。
整个过程异步、低开销,非常适合实时数据推送。
5. MQTT的特点
轻量级:固定报头仅为2字节,相比HTTP大大减少网络开销。
发布/订阅解耦:发布者和订阅者松耦合,便于系统扩展。
三种服务质量(QoS):
QoS 0:最多一次(可能丢失),适合不重要数据。
QoS 1:至少一次(保证到达,可能重复)。
QoS 2:恰好一次(精确一次),开销最大。
持久会话:客户端断线后,Broker可保存未推送的消息,重连后自动推送。
遗愿消息(Last Will):客户端异常断开时,Broker自动发送预定义的遗愿消息,通知其他客户端。
保留消息(Retained Message):Broker为每个主题保留最后一条消息,新订阅者能立即收到该消息(例如获取设备最新状态)。
6. MQTT vs HTTP
| 特性 | MQTT | HTTP |
|---|---|---|
| 协议模式 | 发布/订阅 | 请求/响应 |
| 头部大小 | 2字节 | 通常几百字节 |
| 通信方向 | 双向异步 | 单向同步(客户端发起) |
| 适用场景 | 低功耗、弱网络、实时推送 | 万维网、文件传输 |
| 消息服务质量 | 支持3种QoS | 仅基于TCP重传 |
7. 常见MQTT Broker
| Broker | 特点 |
|---|---|
| EMQX | 开源分布式,支持大规模并发,企业级功能丰富 |
| Mosquitto | Eclipse基金会,轻量级,适合嵌入式或开发测试 |
| VerneMQ | 高可用,支持集群 |
| AWS IoT Core | 托管服务,与AWS生态集成 |
| 阿里云IoT平台 | 国内常用,设备接入方便 |
开发测试推荐使用Mosquitto,简单安装即可运行。
8. 实战:使用 C++ 实现 MQTT 客户端
我们将使用Paho MQTT C++ 客户端库(Eclipse Paho),它是 C++ 项目中使用 MQTT 的首选方案。在开始之前,需要先安装依赖。
8.1 安装 Paho MQTT C++ 库
Paho MQTT C++ 库依赖于底层的 C 库,因此需要先安装 C 库,再安装 C++ 库。
bash
# 1. 安装 Paho MQTT C 库 git clone https://github.com/eclipse/paho.mqtt.c.git cd paho.mqtt.c mkdir build && cd build cmake .. -DPAHO_WITH_SSL=ON make && sudo make install sudo ldconfig # 2. 安装 Paho MQTT C++ 库 git clone https://github.com/eclipse/paho.mqtt.cpp.git cd paho.mqtt.cpp mkdir build && cd build cmake .. make && sudo make install sudo ldconfig
8.2 订阅者代码(subscriber.cpp)
订阅者程序连接到 Broker,订阅主题test/temperature,并持续接收消息。
cpp
#include <iostream> #include <cstdlib> #include <string> #include <thread> #include <chrono> #include "mqtt/async_client.h" const std::string SERVER_ADDRESS("tcp://test.mosquitto.org:1883"); const std::string CLIENT_ID("cpp_subscriber"); const std::string TOPIC("test/temperature"); const int QOS = 1; // 回调类:处理连接丢失和消息到达事件 class mqtt_callback : public virtual mqtt::callback { public: void connection_lost(const std::string& cause) override { std::cout << "Connection lost: " << cause << std::endl; } void message_arrived(mqtt::const_message_ptr msg) override { std::cout << "Received: " << msg->get_topic() << " -> " << msg->to_string() << std::endl; } void delivery_complete(mqtt::delivery_token_ptr token) override {} }; int main(int argc, char* argv[]) { mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID); mqtt_callback cb; client.set_callback(cb); mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(20); connOpts.set_clean_session(true); std::cout << "Connecting to broker..." << std::endl; try { client.connect(connOpts)->wait(); std::cout << "Connected. Subscribing to topic: " << TOPIC << std::endl; client.subscribe(TOPIC, QOS)->wait(); std::cout << "Waiting for messages..." << std::endl; // 保持程序运行,持续接收消息 while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); } } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; return 1; } return 0; }【代码说明】:上述代码中,我们定义了一个自定义回调类mqtt_callback,重写了message_arrived()方法来处理接收到的消息。subscribe()方法订阅指定主题,->wait()等待异步操作完成。
8.3 发布者代码(publisher.cpp)
发布者程序连接到 Broker,循环向主题test/temperature发送温度数据。
cpp
#include <iostream> #include <string> #include <thread> #include <chrono> #include <sstream> #include "mqtt/async_client.h" const std::string SERVER_ADDRESS("tcp://test.mosquitto.org:1883"); const std::string CLIENT_ID("cpp_publisher"); const std::string TOPIC("test/temperature"); const int QOS = 1; int main(int argc, char* argv[]) { mqtt::async_client client(SERVER_ADDRESS, CLIENT_ID); mqtt::connect_options connOpts; connOpts.set_keep_alive_interval(20); connOpts.set_clean_session(true); try { std::cout << "Connecting to broker..." << std::endl; client.connect(connOpts)->wait(); std::cout << "Connected." << std::endl; // 循环发送消息,模拟传感器数据 for (int i = 0; i < 10; ++i) { std::ostringstream oss; oss << "Temperature: " << (20 + i) << "°C"; std::string payload = oss.str(); auto msg = mqtt::make_message(TOPIC, payload); msg->set_qos(QOS); client.publish(msg)->wait(); std::cout << "Published: " << payload << std::endl; std::this_thread::sleep_for(std::chrono::seconds(2)); } client.disconnect()->wait(); std::cout << "Disconnected." << std::endl; } catch (const mqtt::exception& exc) { std::cerr << "Error: " << exc.what() << std::endl; return 1; } return 0; }【代码说明】:发布者使用mqtt::make_message()创建消息对象,通过publish()方法发送。使用->wait()确保消息发送完成后再继续循环。
8.4 编译与运行
bash
# 编译订阅者 g++ -o subscriber subscriber.cpp -lpaho-mqttpp3 -lpaho-mqtt3cs # 编译发布者 g++ -o publisher publisher.cpp -lpaho-mqttpp3 -lpaho-mqtt3cs
使用 Paho C++ 库编译时,需要链接两个库:-lpaho-mqttpp3(C++ 库)和-lpaho-mqtt3cs(C 库的同步版本)。
运行程序:先在一个终端运行订阅者,然后在另一个终端运行发布者,即可看到消息实时推送。
bash
# 终端1:运行订阅者 ./subscriber # 终端2:运行发布者 ./publisher
8.5 添加认证和 TLS(生产环境)
cpp
// 设置用户名密码 connOpts.set_user_name("username"); connOpts.set_password("password"); // 设置 SSL/TLS 配置 mqtt::ssl_options sslOpts; sslOpts.set_ca_cert_file("ca.crt"); sslOpts.set_enable_server_cert_auth(true); connOpts.set_ssl(sslOpts); // 使用 TLS 端口连接 const std::string SERVER_ADDRESS("ssl://your_broker.com:8883");9. MQTT 5.0 新特性简介
MQTT v5.0在v3.1.1基础上增加了:
会话过期:更精细的会话管理。
原因码:返回更详细的操作结果。
主题别名:减少主题字符串传输开销。
用户属性:允许自定义元数据(类似HTTP Header)。
请求/响应模式:在发布/订阅基础上增加同步交互能力。
目前很多Broker(如EMQX)已全面支持v5.0。如果想用 C++ 体验 MQTT 5.0,也可以尝试Boost.MQTT5库,它基于 Boost.Asio 构建,全面实现了 MQTT 5.0 协议标准,支持 QoS 0、1、2。
10. 扩展:纯 C 语言的 MQTT 实现
对于需要在纯 C 环境(如嵌入式设备)中使用 MQTT 的场景,Eclipse Paho 同样提供了 C 语言版本的客户端库,支持同步和异步两种 API 模式。
10.1 同步模式示例(C 语言)
c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "MQTTClient.h" #define ADDRESS "tcp://test.mosquitto.org:1883" #define CLIENTID "c_publisher" #define TOPIC "test/temperature" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; // 创建客户端 MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); // 设置连接参数 conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.MQTTVersion = MQTTVERSION_3_1_1; // 连接 Broker if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(-1); } printf("Connected to broker\n"); // 循环发布消息 for (int i = 0; i < 10; i++) { char payload[64]; snprintf(payload, sizeof(payload), "Temperature: %d°C", 20 + i); pubmsg.payload = payload; pubmsg.payloadlen = strlen(payload); pubmsg.qos = QOS; pubmsg.retained = 0; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Published: %s (token: %d)\n", payload, token); sleep(2); } // 断开连接并清理 MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); return 0; }【编译命令】:
bash
gcc -o c_publisher publisher.c -lpaho-mqtt3c
C 语言版本使用MQTTClient_create()创建客户端,MQTTClient_connect()建立连接,MQTTClient_publishMessage()发布消息,MQTTClient_waitForCompletion()等待消息确认。
11. 应用场景
智能家居:传感器上报温度、控制开关状态。
车联网:车辆位置、故障码、远程控制。
工业物联网:PLC数据采集、设备告警。
即时聊天:轻量级消息推送(类似MQTT over WebSocket)。
移动App推送:Android、iOS后台消息通知。