消息模块: MQTT
鸣谢
本功能最初由 Gitee@liquor(梓陌) 大佬提交 PR 并编写本文档,感谢大佬的开源贡献。
continew-starter-messaging-mqtt 是 ContiNew Starter 面向 Spring Boot 的 MQTT 消息模块封装,提供开箱即用的生产者和消费者能力,同时支持注解和动态订阅。
xml
<dependency>
<groupId>top.continew.starter</groupId>
<artifactId>continew-starter-messaging-mqtt</artifactId>
</dependency>主要特性
✅ 基于 Spring Boot 自动配置,开箱即用
✅ 支持注解订阅 (
@MqttListener) 和动态订阅 (MqttOptions)✅ 支持 QoS 0/1/2,消息可靠性可配置
✅ 支持消息持久化(保留消息 + 持久会话)
✅ 支持主题通配符和共享订阅
✅ 支持异步发布与消费,消费线程池可调
✅ 自动处理注解和动态订阅的分发逻辑
配置示例
yaml
continew-starter.messaging:
mqtt:
enabled: true # 是否启用 MQTT 功能
# ============================
# 基础连接配置
# ============================
host: tcp://127.0.0.1:1883 # Broker 地址(支持 tcp / ssl / ws)
username: ContiNew # 用户名
password: ContiNew # 密码
keepAliveInterval: 60 # 心跳间隔(秒)
cleanSession: false # 是否清除会话(false = 持久会话)
automaticReconnect: true # 启用自动重连
maxReconnectDelay: 128000 # 自动重连最大延迟(ms)
connectionTimeout: 30 # 连接超时(秒)
disconnectedBufferSize: 5000 # 离线缓冲区大小(离线时缓存消息)
bufferEnabled: true # 是否启用离线缓冲
executorServiceTimeout: 1 # 发布线程池关闭等待时长(秒)
# ============================
# 遗嘱消息(客户端异常掉线自动发布)
# ============================
will:
topic: "client/offline" # 遗嘱发布的主题
payload: "offline" # 遗嘱消息内容
qos: 1 # QoS 等级
retained: true # 是否保留消息
# ============================
# SSL/TLS 配置(可选)
# ============================
httpsHostnameVerificationEnabled: true # 是否启用 HTTPS 主机名验证
sslHostnameVerifier: "" # 自定义 HostnameVerifier Bean 名称
sslClientProps:
keyStore: "" # 客户端证书 keystore 路径(可选)
keyStorePassword: "" # keystore 密码
trustStore: "" # truststore 证书路径
trustStorePassword: "" # truststore 密码
httpsHeader: {} # WebSocket 自定义 Header 参数
# ============================
# MQTT 生产者配置
# ============================
producer:
clientId: "mqtt-producer-001" # 生产者客户端 ID
defaultQos: 0 # 默认 QoS
defaultTopic: "" # 默认主题(sendToDefault 会用到)
async: false # 是否异步发送
defaultRetained: false # 是否是保留消息
executor: # 生产者线程池(处理 sendToMqtt)
corePoolSize: 4 # 核心线程数
maxPoolSize: 8 # 最大线程数
queueCapacity: 512 # 队列容量
# ============================
# MQTT 消费者配置
# ============================
consumer:
clientId: "mqtt-consumer-001" # 消费者客户端 ID
qos: 0 # 默认订阅 QoS
autoStartUp: true # 是否随系统启动自动订阅注解主题
async: false # 是否异步消费(true = 使用线程池处理)
completionTimeout: 30000 # 消费超时(ms)
executor: # 消费者线程池(处理消息)
corePoolSize: 4 # 核心线程数量
maxPoolSize: 8 # 最大线程数
queueCapacity: 512 # 队列容量使用方式
注解订阅
java
@Slf4j
@Service
@MqttListener(topic = "${robot.mqtt.topic}")
public class MqttConsumer implements MqttMessageConsumer {
@Override
public void onMessage(MqttMessage message) {
log.info("收到消息 - Topic: {}", message.getTopic());
log.info("消息内容: {}", message.getPayload());
}
}支持配置占位符、通配符、单级/多级匹配。
动态订阅
java
@Component
@RequiredArgsConstructor
public class SubscriptionManager implements ApplicationRunner {
private final MqttOptions mqttOptions;
@Value("${robot.mqtt.topic2}")
private String topic2;
@Override
public void run(ApplicationArguments args) {
mqttOptions.addTopics(topic2, "other/topic");
}
}发布消息
java
@RestController
@RequiredArgsConstructor
public class MqttController {
private final MqttMessageProducer producer;
@PostMapping("/publish")
public ResponseEntity<String> publish(String payload) {
try {
producer.sendToMqtt("robot/data", payload);
return ResponseEntity.ok("发送成功");
} catch (MqttException e) {
return ResponseEntity.status(500).body("发送失败: " + e.getMessage());
}
}
}原理解析
注解订阅 (
@MqttListener)- 扫描所有实现了
MqttMessageConsumer的 Bean - 查找
@MqttListener注解,注册主题和 QoS - 消息到达时由
MqttMessageInboundHandler分发
- 扫描所有实现了
动态订阅 (
MqttOptions)- 可在运行时新增/删除订阅主题
- 支持广播给所有动态订阅的监听器
消息分发逻辑
- 优先匹配精确的注解订阅
- 不匹配时,检查是否在动态订阅列表中
- 支持通配符和共享订阅
- 异常自动捕获,不影响其他监听器
高级特性
通配符匹配
+单级通配符#多级通配符(只能末尾使用)
共享订阅
支持多个消费者共享一个主题,消息轮询分发异步处理
通过async=true配置,可避免阻塞主线程消息持久化
yamlcleanSession: false producer: defaultRetained: true
❓ 常见问题
QoS 选择
0: 最多一次,最快,可能丢失
1: 至少一次,可能重复
2: 恰好一次,最可靠
消息积压
调整消费者线程池
yamlconsumer: executor: corePoolSize: 8 maxPoolSize: 16 queueCapacity: 500
注解 vs 动态订阅
注解: 固定主题,简洁
动态: 运行时灵活,可混合使用