Skip to content

消息模块: MQTT

最后更新: 几秒前
实践版本: v2.15.0-SNAPSHOT

鸣谢

本功能最初由 Gitee@liquor(梓陌) 大佬提交 PR 并编写本文档,感谢大佬的开源贡献。

后端 PR:https://gitee.com/continew/continew-starter/pulls/85

continew-starter-messaging-mqtt 是 ContiNew Starter 面向 Spring Boot 的 MQTT 消息模块封装,提供开箱即用的生产者和消费者能力,同时支持注解和动态订阅。

xml
<dependency>
    <groupId>top.continew.starter</groupId>
    <artifactId>continew-starter-messaging-mqtt</artifactId>
</dependency>

主要特性

  • ✅ 基于 Spring Boot 自动配置,开箱即用

  • ✅ 支持注解订阅 (@MqttListener) 和动态订阅 (MqttOptions)

  • ✅ 支持 QoS 0/1/2,消息可靠性可配置

  • ✅ 支持消息持久化(保留消息 + 持久会话)

  • ✅ 支持主题通配符和共享订阅

  • ✅ 支持异步发布与消费,消费线程池可调

  • ✅ 自动处理注解和动态订阅的分发逻辑


配置示例

yaml
continew-starter.messaging:
  mqtt:
    enabled: true                     # 是否启用 MQTT 功能

    # ============================
    # 基础连接配置
    # ============================
    host: tcp://127.0.0.1:1883        # Broker 地址(支持 tcp / ssl / ws)
    username: ContiNew                # 用户名
    password: ContiNew                # 密码

    keepAliveInterval: 60             # 心跳间隔(秒)
    cleanSession: false               # 是否清除会话(false = 持久会话)
    automaticReconnect: true          # 启用自动重连
    maxReconnectDelay: 128000         # 自动重连最大延迟(ms)
    connectionTimeout: 30             # 连接超时(秒)
    disconnectedBufferSize: 5000      # 离线缓冲区大小(离线时缓存消息)
    bufferEnabled: true               # 是否启用离线缓冲
    executorServiceTimeout: 1         # 发布线程池关闭等待时长(秒)

    # ============================
    # 遗嘱消息(客户端异常掉线自动发布)
    # ============================
    will:
      topic: "client/offline"         # 遗嘱发布的主题
      payload: "offline"              # 遗嘱消息内容
      qos: 1                          # QoS 等级
      retained: true                  # 是否保留消息

    # ============================
    # SSL/TLS 配置(可选)
    # ============================
    httpsHostnameVerificationEnabled: true  # 是否启用 HTTPS 主机名验证
    sslHostnameVerifier: ""                 # 自定义 HostnameVerifier Bean 名称
    sslClientProps:
      keyStore: ""                    # 客户端证书 keystore 路径(可选)
      keyStorePassword: ""            # keystore 密码
      trustStore: ""                  # truststore 证书路径
      trustStorePassword: ""          # truststore 密码
    httpsHeader: {}                   # WebSocket 自定义 Header 参数

    # ============================
    # MQTT 生产者配置
    # ============================
    producer:
      clientId: "mqtt-producer-001"   # 生产者客户端 ID
      defaultQos: 0                   # 默认 QoS
      defaultTopic: ""                # 默认主题(sendToDefault 会用到)
      async: false                    # 是否异步发送
      defaultRetained: false          # 是否是保留消息
      executor:                      # 生产者线程池(处理 sendToMqtt)
        corePoolSize: 4               # 核心线程数
        maxPoolSize: 8                # 最大线程数
        queueCapacity: 512            # 队列容量

    # ============================
    # MQTT 消费者配置
    # ============================
    consumer:
      clientId: "mqtt-consumer-001"   # 消费者客户端 ID
      qos: 0                          # 默认订阅 QoS
      autoStartUp: true               # 是否随系统启动自动订阅注解主题
      async: false                    # 是否异步消费(true = 使用线程池处理)
      completionTimeout: 30000        # 消费超时(ms)
      executor:                      # 消费者线程池(处理消息)
        corePoolSize: 4               # 核心线程数量
        maxPoolSize: 8                # 最大线程数
        queueCapacity: 512            # 队列容量

使用方式

注解订阅

java
@Slf4j
@Service
@MqttListener(topic = "${robot.mqtt.topic}")
public class MqttConsumer implements MqttMessageConsumer {

    @Override
    public void onMessage(MqttMessage message) {
        log.info("收到消息 - Topic: {}", message.getTopic());
        log.info("消息内容: {}", message.getPayload());
    }
}

支持配置占位符、通配符、单级/多级匹配。

动态订阅

java
@Component
@RequiredArgsConstructor
public class SubscriptionManager implements ApplicationRunner {

    private final MqttOptions mqttOptions;

    @Value("${robot.mqtt.topic2}")
    private String topic2;

    @Override
    public void run(ApplicationArguments args) {
        mqttOptions.addTopics(topic2, "other/topic");
    }
}

发布消息

java
@RestController
@RequiredArgsConstructor
public class MqttController {

    private final MqttMessageProducer producer;

    @PostMapping("/publish")
    public ResponseEntity<String> publish(String payload) {
        try {
            producer.sendToMqtt("robot/data", payload);
            return ResponseEntity.ok("发送成功");
        } catch (MqttException e) {
            return ResponseEntity.status(500).body("发送失败: " + e.getMessage());
        }
    }
}

原理解析

  1. 注解订阅 (@MqttListener)

    • 扫描所有实现了 MqttMessageConsumer 的 Bean
    • 查找 @MqttListener 注解,注册主题和 QoS
    • 消息到达时由 MqttMessageInboundHandler 分发
  2. 动态订阅 (MqttOptions)

    • 可在运行时新增/删除订阅主题
    • 支持广播给所有动态订阅的监听器
  3. 消息分发逻辑

    • 优先匹配精确的注解订阅
    • 不匹配时,检查是否在动态订阅列表中
    • 支持通配符和共享订阅
    • 异常自动捕获,不影响其他监听器

高级特性

  • 通配符匹配

    • + 单级通配符
    • # 多级通配符(只能末尾使用)
  • 共享订阅
    支持多个消费者共享一个主题,消息轮询分发

  • 异步处理
    通过 async=true 配置,可避免阻塞主线程

  • 消息持久化

    yaml
    cleanSession: false
    producer:
      defaultRetained: true

❓ 常见问题

  1. QoS 选择

    • 0: 最多一次,最快,可能丢失

    • 1: 至少一次,可能重复

    • 2: 恰好一次,最可靠

  2. 消息积压

    • 调整消费者线程池

      yaml
      consumer:
        executor:
          corePoolSize: 8
          maxPoolSize: 16
          queueCapacity: 500
  3. 注解 vs 动态订阅

    • 注解: 固定主题,简洁

    • 动态: 运行时灵活,可混合使用


相关资源