随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。
本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储和计算架构、推送模型及服务器架构设计,推动 IoT 场景下消息处理的高效性和可扩展性以实现 MQTT 协议。
此外,阿里云 MQTT 以 RocketMQ-MQTT 为基础,不断进行迭代创新。阿里云是开源 RocketMQ-MQTT 的主要贡献者和使用者之一。面对设备通信峰谷时段差异性的挑战,本文将介绍阿里云如何将 Serverless 架构应用于消息队列,有效降低运营成本,同时利用云原生环境的特性,为 IoT 设备提供快速响应和灵活伸缩的通讯能力。
进一步地,我们将探讨介绍在云端生态体系中整合 MQTT 的实践,介绍基于统一存储的数据生态集成方案,展示其强大的技术能力和灵活的数据流转能力。
物联网技术,作为当代科技领域的璀璨明星,其迅猛发展态势已成共识。据权威预测,至 2025 年,全球物联网设备安装基数有望突破 200 亿大关,这一数字无疑昭示着一个万物互联时代的到来。
更进一步,物联网数据量正以惊人的年增长率约 28% 蓬勃膨胀,预示着未来数据生态中超过 90% 的实时数据将源自物联网。这一趋势深刻改变了数据处理的格局,将实时流数据处理推向以物联网数据为核心的新阶段。
边缘计算的崛起,则是对这一变革的积极响应。预计未来,高达 75% 的数据处理任务将在远离传统数据中心或云端的边缘侧完成。鉴于物联网数据的海量特性,依赖云端进行全部数据处理不仅成本高昂,且难以满足低延迟要求。因此,有效利用边缘计算资源,就地进行数据初步处理,仅将提炼后的关键信息上传云端,成为了提升效率、优化用户体验的关键策略。
在此背景下,消息传递机制在物联网场景中的核心价值愈发凸显:
总之,消息技术不仅是物联网架构的粘合剂,更是驱动数据流动与智能决策的核心动力,其在物联网领域的应用深度与广度,正随着技术迭代与市场需求的双重驱动而不断拓展。
同时传统消息场景和物联网消息场景有很多的不同,包含以下几个特点:
1)硬件资源差异
传统消息场景依托于高性能、高可靠性的服务集群,运算资源充沛,客户端部署环境多为容器、虚拟机乃至物理服务器,强调集中式计算能力。相比之下,物联网消息场景的客户端直接嵌入至网络边缘的微型设备中,如传感器、智能家电等,这些设备往往受限于极低的计算与存储资源,对能效比有着极高要求。
2)网络环境挑战
在经典的内部数据中心(IDC)环境中,消息处理享有稳定的网络条件和可控的带宽、延迟指标。而物联网环境则拓展至公共网络,面对的是复杂多变的网络状况,尤其在偏远地区或网络覆盖弱的区域,不稳定的连接成为常态,对消息传输的健壮性和效率提出了更高挑战。
3)客户端规模的量级跃迁
传统消息系统处理的日均消息量通常维持在百万级,适用于集中度较高的消息分发。物联网的兴起促使设备数量呈爆炸性增长,动辄涉及亿万级别的终端节点,这对系统的扩展性、消息路由的高效性提出了全新的要求。
4)生产与消费模式的演变
传统场景倾向于采用集中式同步生产模式,强调消息的一致性与有序处理。而物联网消息生成则体现出分散化的特性,每个设备根据其感知环境独立产生少量消息,这对消息收集与整合机制设计提出了新的思考。消费模式上,物联网场景经常涉及大规模广播或组播,单条消息可能触达数百万计的接收者,要求系统具备高效的广播能力和灵活的订阅管理机制。
我们看到,物联网所需要的消息技术与经典的消息设计有很大的不同。接下来我们来看看基于 RocketMQ 的融合架构 MQTT 设计为了解决物联网的消息场景有哪些特点。
1)融入 MQTT 协议,适应物联网环境特性
RocketMQ 5.0 通过整合 RocketMQ-MQTT,紧密贴合了物联网领域广泛采用的MQTT协议标准。此协议针对低功耗、网络条件不稳定的情况优化,以其轻量级特性和丰富的功能集脱颖而出,支持不同的消息传递保障级别,满足了从“最多一次”到“仅一次”的多样化需求。协议的领域模型与 RocketMQ 的核心组件高度协调,促进了消息、主题、发布订阅模式的自然融合,为建立一个从设备到云端的无缝消息传递体系打下了稳固的基础。
2)灵活的存储与计算解耦架构
为了应对物联网场景下对高并发连接和大规模数据处理的需求,RocketMQ 5.0 采取了存储与计算分离的架构设计。RocketMQ Broker 作为核心存储组件,确保了数据的持久化与可靠性,而 MQTT 相关的逻辑操作则在专门的代理层实施,这不仅优化了对大量连接、复杂订阅关系的管理,也增强了实时消息推送的能力。这种架构允许根据业务负载动态调整代理层资源,通过增加代理节点来平滑应对设备连接数的增加,体现了系统良好的弹性和扩展潜力。
3)促进端云数据协同的整合架构
RocketMQ-MQTT 通过其整合的架构设计,促进了物联网设备与云端应用之间的高效数据共享。基于统一的消息存储策略,每条消息在系统内只需存储一次,即可供两端消费,减少了数据冗余,提高了数据流通的效率。此外,RocketMQ 作为数据流的存储中枢,自然而然地与流计算技术结合,为实时分析物联网生成的海量数据提供了便利,加速了数据价值的发掘过程。
首先要解决的问题是物联网消息的存储模型。在发布订阅业务模型中,常用的存储模型有两种,写放大和读放大,我们将依次分析两种模型。
写放大模型:
在此模型下,每位消费者拥有专属的消息队列,每条消息需要复制并分布到所有目标消费者的队列中,消费者仅关注并处理自己队列中的消息。以三级主题/Topic/subTopic/test 为例,若该主题吸引了大量客户端订阅,采取“一客一队列”的策略,即每个符合订阅规则的客户端或通配符订阅均维护一份消息副本,将导致消息的存储需求随订阅者数量线性增长。
尽管这种模式在某些传统消息场景,比如遵循 AMQP 协议的应用中表现得游刃有余,因为它确保了消息传递的隔离性和可靠性。但在物联网场景下,特别是当单个消息需被数以百万计的设备消费时,“写放大”策略将引发严重的存储资源消耗问题,迅速成为不可承受之重。
读放大的考量与挑战:
鉴于物联网场景的特殊需求,直接应用传统的“写放大”模型显然是不可行的。为解决这一难题,RocketMQ-MQTT 采取了更为高效与灵活的存储策略,旨在减少存储冗余,提高系统整体的可扩展性和资源利用率:
在“读放大”模式下,每条消息实际上被存储一次,而为了支持通配符订阅的高效检索,系统在消息存储阶段会创建额外的索引信息——即 consume queue(消费队列)。对于如/Topic/subTopic/*这样的通配符订阅,系统会在每个匹配的通配符队列中生成相应的索引,使得订阅了不同通配符主题或具体主题的消费者,都能通过这些共享的存储实体找到并消费到消息。尽管这看似增加了“读”的复杂度,但实际上,每个 consume queue 作为索引,其体积远小于原始消息,显著降低了整体存储成本,同时提高了消息检索与分发的效率。
为此,我们设计了一种多维度分发的 Topic 队列模型,如上图所示,消息可以来自各个接入场景(如服务端的 MQ/AMQP、客户端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引(ConsumerQueue),如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,客户端 MQTT 场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。这样的一个队列模型就可以同时支持服务端和终端场景的接入和消息收发,达到一体化的目标。
实现这一模型,RocketMQ依托了两项关键技术特性:
通过采用“读放大”模型,结合 RocketMQ 的轻型队列特性和百万队列的底层技术支持,我们不仅有效解决了物联网环境下消息存储与分发的挑战,还实现了存储成本与系统性能的双重优化。这种设计不仅减少了存储空间的占用,还通过高度优化的索引机制加快了消息检索速度,为大规模物联网设备的消息通信提供了一个既经济又高效的解决方案。
在介绍完底层队列存储模型之后,我们将重点探讨匹配查找和可靠送达的实现机制。在传统的消息队列 RocketMQ 中,经典的消费模式是消费者通过客户端直接发起长轮询请求,以精准地获取对应主题的队列消息。然而,在 MQTT 场景下,由于客户端数量众多且订阅关系复杂,长轮询模式显得不够有效,因此消费过程变得更加复杂。为此,我们采用了一种推拉结合的模型。
本模型的核心在于终端通过 MQTT 协议连接至代理节点,消息可以来源于多种场景(如MQ、AMQP、MQTT)。当消息存入主题队列后,通知逻辑模块将实时监测到新消息的到达,进而生成消息事件(即消息的主题名称),并推送至网关节点。网关节点根据连接终端的订阅状态进行内部匹配,识别能够接收这一消息的终端,随后触发拉取请求,以从存储层读取消息并推送至终端。
在这个流程中,一个关键问题是通知模块如何确定终端感兴趣的消息,以及哪些网关节点会对此类消息感兴趣。这实际上是一个核心的匹配搜索问题。常见的解决方案主要有两种:第一种是简单的事件广播,第二种是将线上订阅关系集中存储(例如图中的 Lookup 模块),然后进行匹配搜索,再执行精准推送。
虽然事件广播机制在扩展性上存在一定问题,但其性能表现仍然良好,因为我们推送的数据量相对较小,仅为 Topic 名称。此外,同一 Topic 的消息事件可以合并为一个事件,这是我们当前在生产环境中默认采用的方式。另一方面,将线上订阅关系集中存储在 RDS 或 Redis 中也是一种普遍的做法,但这需要保证数据的实性,匹配搜索的过程可能会对整体实时消息链的延迟产生影响。
在该模型中,还设计了一个缓存模块,以便在需要广播大量消息时,避免各个终端对存储层发起重复的读取请求,从而提高整体系统的效率。
阿里云 MQTT 在 Serverless 上的实践
随着云原生技术的不断发展,现代消息中间件逐渐以容器编排为基础,如何实现真正的无服务器架构及秒级弹性管理已成为一项重要的研究课题。
阿里云作为开源 RocketMQ-MQTT 的主要贡献者和使用者之一,在 MQTT 弹性设计上有很多优化方式和实践经验。我们将介绍阿里云在弹性上的设计思路,展示其如何实现高效、弹性强的 MQTT 消息中间件:
1)抽离网络连接层
阿里云 MQTT 采用类似 Sidecar 的模式,将网络连接层与核心业务逻辑进行分离,使用 Rust 语言来处理网络连接。与 Java 相比,Rust 在内存消耗和启动速度上具有显著优势,尤其在处理大规模 MQTT 连接时,能够有效降低内存占用。
2)秒级扩容
每个 Pod 的资源请求设置较低,同时预留部分 Pod 专门运行 Rust 进程。在扩容需求出现时,系统能够快速启动 MQTT Proxy 进程,省去 Pod 创建和资源挂载的时间,从而显著提升响应速度。