消息桥接
消息桥接功能允许 TMQTT 将 MQTT 消息按规则自动转发到外部下游系统,实现数据流转和集成。
工作流程
MQTT 客户端 → TMQTT Broker → [规则引擎匹配] → [Sink 发送] → 下游系统
↓
规则不匹配 → 正常 MQTT 转发(不影响)桥接功能与 MQTT 协议处理并行运行,不影响正常的 MQTT 消息收发。
规则引擎
规则定义了"哪些消息转发到哪里"。每条规则包含主题过滤条件和一个或多个 Sink(下游目标)。
规则管理
所有规则通过 HTTP API 管理,支持动态增删改:
创建规则:指定主题匹配模式(支持 MQTT 通配符 + 和 #),关联一个或多个 Sink。
更新规则:修改匹配主题、启用/禁用规则、调整 Sink 列表。
删除规则:移除不再需要的规则。
查看规则:获取当前所有规则及其状态。
支持的下游后端
TMQTT 支持将消息转发到以下下游系统:
Kafka
将 MQTT 消息发送到 Kafka Topic,适合数据管道和流处理场景。
主要配置:
- Kafka Broker 地址列表
- 目标 Topic
- 分区数(默认 6)
TDengine
将 MQTT 消息写入 TDengine 时序数据库,适合传感器数据采集和时序分析。
主要配置:
- TDengine WebSocket 连接地址和认证信息
- 目标数据库和超级表
- 子表名模板(支持动态主题替换)
- 标签和字段映射
RabbitMQ
将 MQTT 消息发送到 RabbitMQ Exchange,适合企业消息队列集成。
主要配置:
- RabbitMQ 连接地址和认证信息
- 目标 Exchange 名称
- 路由键模板
RocketMQ
将 MQTT 消息发送到 RocketMQ Topic,通过 HTTP Proxy 方式连接。
主要配置:
- RocketMQ HTTP Proxy 地址
- 目标 Topic 和 Group
可靠性保障
TMQTT 桥接功能内置多项可靠性机制,确保消息在转发过程中不丢失。
批量聚合
消息不会逐条发送,而是在缓冲区中按条件聚合后批量发送。当缓冲区满(默认 1MB)或定时器触发(默认 500ms)时触发一次发送。批量发送显著减少网络开销。
异步通道
消息通过异步通道(AsyncChannel)从 Broker 传递到 Sink,解耦了 MQTT 处理和下游发送,防止下游慢导致 Broker 阻塞。
熔断器
每个 Sink 内置熔断器,当连续失败超过阈值(默认 5 次)时自动触发熔断,暂停发送一段时间。等待冷却期过后自动尝试恢复,避免对已故障的下游系统持续施压。
死信队列(DLQ)
发送失败且超过最大重试次数(默认 3 次)的消息会被放入死信队列,而不是直接丢弃。死信消息可通过 API 查看和重新投递,确保重要消息不会因网络波动而丢失。
连接池
Sink 内部维护连接池(默认 10 个连接),复用连接发送消息,减少建连开销,提高吞吐。
Payload 过滤
规则可以配置 Payload 过滤条件,只有消息内容满足条件的才会被转发。
支持的过滤方式:
- 按主题前缀精确匹配
- 按 JSON 字段值匹配
监控与管理
所有桥接相关操作通过 HTTP API 完成:
- 查看规则列表和状态
- 创建/更新/删除规则
- 查看 Sink 状态(连接状态、成功率、熔断状态)
- 查看和重新投递死信队列中的消息
具体 API 接口请参见 管理 API。
Copyright (c) 2026 桃子 TaoZi.Pub https://taozi.pub | MIT License