Skip to content

消息桥接

消息桥接功能允许 TMQTT 将 MQTT 消息按规则自动转发到外部下游系统,实现数据流转和集成。


工作流程

MQTT 客户端 → TMQTT Broker → [规则引擎匹配] → [Sink 发送] → 下游系统

                           规则不匹配 → 正常 MQTT 转发(不影响)

桥接功能与 MQTT 协议处理并行运行,不影响正常的 MQTT 消息收发。


规则引擎

规则定义了"哪些消息转发到哪里"。每条规则包含主题过滤条件和一个或多个 Sink(下游目标)。

规则管理

所有规则通过 HTTP API 管理,支持动态增删改:

创建规则:指定主题匹配模式(支持 MQTT 通配符 +#),关联一个或多个 Sink。

更新规则:修改匹配主题、启用/禁用规则、调整 Sink 列表。

删除规则:移除不再需要的规则。

查看规则:获取当前所有规则及其状态。


支持的下游后端

TMQTT 支持将消息转发到以下下游系统:

Kafka

将 MQTT 消息发送到 Kafka Topic,适合数据管道和流处理场景。

主要配置

  • Kafka Broker 地址列表
  • 目标 Topic
  • 分区数(默认 6)

TDengine

将 MQTT 消息写入 TDengine 时序数据库,适合传感器数据采集和时序分析。

主要配置

  • TDengine WebSocket 连接地址和认证信息
  • 目标数据库和超级表
  • 子表名模板(支持动态主题替换)
  • 标签和字段映射

RabbitMQ

将 MQTT 消息发送到 RabbitMQ Exchange,适合企业消息队列集成。

主要配置

  • RabbitMQ 连接地址和认证信息
  • 目标 Exchange 名称
  • 路由键模板

RocketMQ

将 MQTT 消息发送到 RocketMQ Topic,通过 HTTP Proxy 方式连接。

主要配置

  • RocketMQ HTTP Proxy 地址
  • 目标 Topic 和 Group

可靠性保障

TMQTT 桥接功能内置多项可靠性机制,确保消息在转发过程中不丢失。

批量聚合

消息不会逐条发送,而是在缓冲区中按条件聚合后批量发送。当缓冲区满(默认 1MB)或定时器触发(默认 500ms)时触发一次发送。批量发送显著减少网络开销。

异步通道

消息通过异步通道(AsyncChannel)从 Broker 传递到 Sink,解耦了 MQTT 处理和下游发送,防止下游慢导致 Broker 阻塞。

熔断器

每个 Sink 内置熔断器,当连续失败超过阈值(默认 5 次)时自动触发熔断,暂停发送一段时间。等待冷却期过后自动尝试恢复,避免对已故障的下游系统持续施压。

死信队列(DLQ)

发送失败且超过最大重试次数(默认 3 次)的消息会被放入死信队列,而不是直接丢弃。死信消息可通过 API 查看和重新投递,确保重要消息不会因网络波动而丢失。

连接池

Sink 内部维护连接池(默认 10 个连接),复用连接发送消息,减少建连开销,提高吞吐。


Payload 过滤

规则可以配置 Payload 过滤条件,只有消息内容满足条件的才会被转发。

支持的过滤方式

  • 按主题前缀精确匹配
  • 按 JSON 字段值匹配

监控与管理

所有桥接相关操作通过 HTTP API 完成:

  • 查看规则列表和状态
  • 创建/更新/删除规则
  • 查看 Sink 状态(连接状态、成功率、熔断状态)
  • 查看和重新投递死信队列中的消息

具体 API 接口请参见 管理 API

Copyright (c) 2026 桃子 TaoZi.Pub https://taozi.pub | MIT License