Skip to content

FreeMQTT Plus: 一个 MQTT Broker 集群的实现

随着物联网(IoT)的快速发展,MQTT 协议因其轻量级、低带宽消耗和高效的发布-订阅模型,已成为物联网通信的事实标准。在这一背景下,MQTT Broker 作为消息传输的中枢,其性能和可靠性直接决定了整个系统的可用性。面对海量设备接入和高并发消息场景,单节点 Broker 的局限性逐渐暴露,而集群化部署成为解决这一问题的必然选择。本文将从集群设计目标、实现方案及挑战等多个角度,探讨 FreeMQTT Plus 集群的实现路径。


一、MQTT Broker 集群的设计目标

MQTT Broker 集群的核心目标是解决单点瓶颈问题,同时满足以下需求:

  1. 高可用性:通过多节点冗余,避免单点故障导致服务中断。
  2. 负载均衡:动态分配客户端连接和消息处理压力,提升系统吞吐量。
  3. 横向扩展:支持按需扩容,灵活应对设备数量增长和流量波动。
  4. 数据一致性:确保集群内会话(Session)状态和消息路由的一致性。

二、常见的集群实现方案

目前主流的 MQTT Broker 集群实现方案可分为以下几类:

1. 基于代理层的分布式架构
  • 实现原理:在 Broker 节点前部署负载均衡器(如 Nginx、HAProxy),通过 TCP/IP 层分发客户端连接。各 Broker 节点独立运行,不共享状态。
  • 优点:架构简单,易于部署;适合客户端会话无状态的场景(Clean Session=1)。
  • 缺点:无法处理持久化会话(Clean Session=0),消息可能因节点故障丢失;跨节点主题路由需额外处理。
  • 典型应用:EMQX 早期版本、HiveMQ 的负载均衡模式。
2. 共享订阅与数据同步
  • 实现原理:集群节点间通过共享数据库(如 Redis、RocksDB)或分布式一致性协议(如 Raft、Gossip)同步会话和消息数据。
  • 优点:支持持久化会话;节点故障时客户端可无缝迁移。
  • 缺点:数据同步可能成为性能瓶颈;强一致性要求可能牺牲吞吐量。
  • 典型应用:Mosquitto 结合 Redis 插件、EMQX 的 Mnesia 数据库同步。
3. 分片与路由优化
  • 实现原理:按客户端 ID 或主题(Topic)对集群节点进行分片,结合路由表实现消息精准投递。
  • 优点:减少跨节点通信开销;适合大规模主题场景。
  • 缺点:分片策略需预先设计,动态扩容复杂;需处理热点分片问题。
  • 典型应用:VerneMQ 的主题分区策略、NanoMQ 的横向分片方案。

三、FreeMQTT Plus 架构

FreeMQTT Plus 采用的是代理分布式架构。

组成:

  • Nginx 负载均衡器。
  • 黑节点(A Node) 即是我们已知的 MQTT Broker, 具有和其他节点组成集群的能力。
  • 白节点(B Node) 是连接各个黑节点,起着消息路由功能,对客户端是不可见的。
  • 日志节点(L Node)是汇集黑白节点日志的,此节点可选且只能有一个

架构图:


四、MQTT Broker 集群实现中的关键挑战

一个 MQTT Broker 集群需要解决如下的难题:

1. 会话状态的同步难题

MQTT 协议要求 Broker 维护客户端会话状态(如未确认的 QoS 1/2 消息)。在集群中,如何高效同步会话数据成为关键。一种折中方案是采用“粘性会话”(Sticky Session),通过负载均衡器将同一客户端固定到特定节点,但牺牲了故障转移的即时性。

2. 消息路由的复杂性

当发布者与订阅者位于不同节点时,消息需跨节点转发。常见的优化手段包括:

  • 主题树路由:将主题按层级拆分,节点仅订阅特定子树。
  • 元数据广播:通过 Gossip 协议快速传播订阅关系。
  • 边缘路由:在边缘节点缓存订阅信息,减少中心节点压力。
3. 横向扩展与资源争用

集群扩容时,新节点的加入可能引发资源竞争。例如,Kubernetes 等容器化平台可通过 StatefulSet 动态扩展 Broker Pod,但需配合服务发现机制(如 etcd)动态更新集群成员列表。


五、FreeMQTT Plus 的对策

对于上一节中提到的 MQTT Broker 集群实现的挑战,FreeMQTT Plus 采取如下对策:

1. 会话状态的同步
  • FreeMQTT Plus 采用分布式黑(A)白(B)节点架构
  • 在集群中,不需采用“粘性会话”(Sticky Session)
  • 客户的 session 保存连接的A节点上
  • 客户连接时, 当 clean senssion=0,该A节点随机选择(节点内部的负载均衡)一个B节点查询其他A节点是否有该clientID 的session存在,若存在则同步 session 信息 。
2. 消息路由
  • Retain 消息
    • MQTT 客户端订阅新的 Topic 时,查询本地A节点是否有保留消息有则发送给客户端
    • 客户所在的A节点随机选择(节点内部的负载均衡)一个B节点查询其他A节点是否有是否有对应该Topic的保留消息, 如有则返回给客户所在的A节点
  • 新发布的消息
    • MQTT 客户端发布某个 Topic 消息时,其所在的A节点随机选择(节点内部的负载均衡)一个B节点,把新的消息发给该B节点
    • B节点根据客户所属的Application ID, 来路由消息 (B节点根据客户连接时所属的Application记录A节点和Application关联)
    • 收到B节点路由来的消息后,A节点按正常的规则转发给相关订阅该 Topic 的客户端
  • 共享订阅
    • 当上面的B节点收到一个新的消息路由请求时,除了做常规订阅的路由时,该B节点还向集群中的每个A节点查询该消息Topic是有对应的共享订阅
    • A节点返回共享订阅信息的列表
    • B节点根据全部A节点返回的共享订阅列表,随机选个A节点分发给某个A节点(到达共享分发的目标)
3. 横向扩展
  • FreeMQTT Plus 的黑(A)白(B)节点是无主从关系的,每个的A节点是对等的不分主次
  • 同样每个B类节点也是不分主次
  • 客户的 session 信息分布在不同的A节点上,不进行中心存储同步
  • 客户的订阅信息也是分布存储在不同的A节点上
  • A、B节点都不存在单点故障问题
  • A、B节点都易于水平扩展
4. 黑白节点间通讯机制
  • 黑(A)白(B)节点之间通讯底层协议采用MQTT5
  • 借助 MQTT5 的 User Propertity 和 Correlation Data 属性定义节点间的通讯元语
  • 没有像其他MQTT Broker集群那样借助 gossip 或 redis 类的第三方协议或组件
  • 每个B节点都是一个特殊的MQTT客户端(属于特殊的Application)
  • 每个B节点作为FreeMQTT Plus集群的智能消息路由角色
  • 为使A、B节点进行通讯,FreeMQTT Plus 定义如下17个通讯元 (meta command):
    • 增加 App 引用计数 INC_APPREF,方向 A-->B
    • 减少 App 引用计数 DEC_APPREF,方向 A-->B
    • 获取 App 引用列表 FETCH_APPREF,方向 B-->A
    • 反馈 App 引用列表 FETCH_APPREF_ECHO,方向 A-->B
    • Pub 消息路由 ROUTE_MESSAGE,方向 A<-->B
    • 获取Retain消息 FETCH_RETAIN_MESSAGE,方向 B-->A
    • 返回Retain消息 RETAIN_MESSAGE_ECHO,方向 A-->B
    • 获取共享消息 FETCH_SHARE_MESSAGE,方向 B-->A
    • 返回共享消息 FETCH_SHARE_MESSAGE_ECHO,方向 A-->B
    • 获取共享消息 FETCH_SHARE_MESSAGE,方向 B-->A
    • 派发共享消息 DELIVERY_SHARE_MESSAGE,方向 B-->A
    • 获取A节点metrics FETCH_METRICS,方向 B-->A
    • 返回A节点metrics FETCH_METRICS,方向 A-->B
    • 清除客户session CLEAR_SESSION,方向 A<-->B
    • 检查客户session 是否存在 CHECK_SESSION,方向 B<-->A
    • 返回客户session 是否存在 CHECK_SESSION_ECHO,方向 A-->B
    • 检查客户session 完成 CHECK_SESSION_COMPLETE,方向 B<-->A

六、结语

MQTT Broker 集群的实现是一项系统性工程,需在一致性、可用性和扩展性之间寻求平衡。未来,随着 5G 和边缘计算的普及,多级集群(中心-边缘协同)、混合云部署等新模式将进一步推动 MQTT 集群架构的演进。唯有深入理解协议特性与业务场景,方能设计出既稳健又灵活的集群方案。


希望这篇文章能够为 MQTT 集群的实践提供有价值的参考!