news 2026/6/6 11:16:06

物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
  • 物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能
    • 源码(mqtt-simulator-sample02)
    • 搭载 emqx 公共免费的broker
    • 相关依赖
    • 同步 vs 异步
    • 基于MqttAsyncClient构建
      • 部分源码
    • 演示结果

物联网实战:Spring Boot MQTT | 模拟器Paho客户端拆解高性能

MqttAsyncClient 非阻塞、高吞吐MQTT 应用

源码(mqtt-simulator-sample02)

https://gitee.com/kcnf-iot/mqtt-simulator

搭载 emqx 公共免费的broker

https://www.emqx.com/zh/mqtt/public-mqtt5-broker

相关依赖

<!-- MQTT Client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.mqttv5.client</artifactId> <version>1.2.5</version> </dependency>

同步 vs 异步

维度同步 API(MqttClient)异步 API(MqttAsyncClient)
阻塞性阻塞等待执行完成非阻塞,操作完回调通知
线程模型当前线程等待底层 IO 线程,不阻塞主线程
高并发性能低,大量请求会阻塞极高,适合高吞吐、多设备
适用场景简单设备、单连接、低频次网关、高吞吐、消息转发、服务端
核心回调仅消息回调IMqttActionListener 监听所有操作结果

基于MqttAsyncClient构建

部分源码

package com.jysemel.iot; import org.eclipse.paho.mqttv5.client.*; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; public class MqttAsyncGateway { private MqttAsyncClient upClient; // 上行:收设备消息 private MqttAsyncClient downClient;// 下行:转发消息 // ====================== 频率控制(每秒最多1条,可自己改) ====================== private static final long PUBLISH_INTERVAL = 2000; // 2秒一条 private long lastSendTime = 0; public static void main(String[] args) { new MqttAsyncGateway().start(); } public void start() { try { // 创建两个异步客户端 upClient = new MqttAsyncClient(MqttConst.UP_BROKER, MqttConst.UP_CLIENT_ID, new MemoryPersistence()); downClient = new MqttAsyncClient(MqttConst.DOWN_BROKER, MqttConst.DOWN_CLIENT_ID, new MemoryPersistence()); MqttConnectionOptions options = new MqttConnectionOptions(); options.setKeepAliveInterval(10); options.setCleanStart(true); // 连接上行 upClient.connect(options, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 上行连接成功"); subscribeDeviceTopic(); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 上行连接失败"); } }); // 连接下行 downClient.connect(options, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 下行连接成功"); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 下行连接失败"); } }); // 消息回调 upClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) { String content = new String(message.getPayload()).trim(); // ====================== 频率控制核心 ====================== long now = System.currentTimeMillis(); if (now - lastSendTime < PUBLISH_INTERVAL) { System.out.println("⏱ 频率限制,跳过转发:" + content); return; } lastSendTime = now; System.out.println("\n📥 收到:" + topic + " -> " + content); forwardToDownstream(content); // 转发 } @Override public void disconnected(MqttDisconnectResponse disconnectResponse) {} @Override public void mqttErrorOccurred(MqttException exception) {} @Override public void deliveryComplete(IMqttToken token) {} @Override public void connectComplete(boolean reconnect, String serverURI) {} @Override public void authPacketArrived(int reasonCode, MqttProperties properties) {} }); } catch (Exception e) { e.printStackTrace(); } } // 订阅 private void subscribeDeviceTopic() { try { upClient.subscribe(MqttConst.DEVICE_TOPIC, 1, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("✅ 订阅成功:" + MqttConst.DEVICE_TOPIC); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 订阅失败"); } }); } catch (Exception e) { e.printStackTrace(); } } // 异步转发 private void forwardToDownstream(String content) { try { MqttMessage msg = new MqttMessage(content.getBytes()); msg.setQos(1); downClient.publish(MqttConst.FORWARD_TOPIC, msg, null, new MqttActionListener() { @Override public void onSuccess(IMqttToken token) { System.out.println("📤 转发完成:" + content); } @Override public void onFailure(IMqttToken token, Throwable exception) { System.err.println("❌ 转发失败"); } }); } catch (Exception e) { e.printStackTrace(); } } }

演示结果

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/6 11:15:05

告别硬件解调!用C语言在8KHz采样平台上实现FSK信号过零检测(附GitHub工程源码)

在8KHz采样平台上用纯C语言实现FSK信号过零检测的工程实践当我们需要在资源受限的嵌入式设备上实现FSK信号解调时&#xff0c;硬件解调器往往成为系统成本和功耗的瓶颈。本文将分享一种基于过零检测的纯软件解调方案&#xff0c;特别适合采样率仅为8KHz的低端嵌入式平台。不同于…

作者头像 李华
网站建设 2026/6/6 11:11:21

Chain of Draft:LLM推理加速的推测性解码新范式

1. 项目概述&#xff1a;当“少即是多”真正落地在AI推理链上你有没有遇到过这样的场景&#xff1a;跑一个中等复杂度的推理任务&#xff0c;模型明明参数量不大&#xff0c;但响应时间却卡在3秒以上&#xff0c;GPU显存占用还飙到85%&#xff0c;成本单次计算接近0.02美元——…

作者头像 李华
网站建设 2026/6/6 11:10:32

自动化理由生成:让AI决策可解释、可追溯、可审计

1. 项目概述&#xff1a;当AI开始“讲道理”&#xff0c;我们到底在期待什么&#xff1f;“Automated Rationale Generation: Moving Towards Explainable AI”——这个标题乍看像一篇学术论文的副标题&#xff0c;但在我过去十年跑遍几十个AI落地现场、从智能客服后台到医疗影…

作者头像 李华
网站建设 2026/6/6 11:09:34

抖音批量下载工具深度解析:如何高效获取无水印素材?

抖音批量下载工具深度解析&#xff1a;如何高效获取无水印素材&#xff1f; 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallb…

作者头像 李华