20 KiB
tb-mqtt-transport 流量分析
1. 镜像概述
tb-mqtt-transport 是 ThingsBoard 的 MQTT 传输服务,负责:
- 接收设备通过 MQTT 协议发送的消息
- 设备认证和会话管理
- 消息格式转换(JSON、Protobuf等)
- 将设备消息发送到核心服务(通过Kafka)
- 接收核心服务消息并推送到设备
镜像名称: thingsboard/tb-mqtt-transport
源码位置:
- 主应用:
transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ - 传输处理:
common/transport/mqtt/
端口: 1883 (MQTT)
2. 流量入口
2.1 MQTT 协议入口
入口点:
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.javacommon/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
端口: 1883 (MQTT协议)
流量来源:
- IoT设备: 通过MQTT协议连接并发送数据
- MQTT客户端: 各种MQTT客户端库
- 网关设备: MQTT网关转发设备数据
MQTT消息类型:
CONNECT: 设备连接请求PUBLISH: 设备发布消息(遥测、属性、RPC响应等)SUBSCRIBE: 设备订阅主题(属性更新、RPC请求等)DISCONNECT: 设备断开连接
2.2 Kafka 消息入口(下行)
入口点:
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
Kafka主题: tb.transport.* (通知主题)
流量来源:
- tb-core: 核心服务推送的消息(RPC请求、属性更新通知等)
消息类型:
ToTransportMsg: 传输层消息ToDeviceRpcRequestMsg: RPC请求AttributeUpdateNotificationMsg: 属性更新通知SessionCloseNotificationProto: 会话关闭通知
3. 流量处理流程
3.1 设备连接流程(上行)
设备 (MQTT CONNECT)
↓
MqttTransportServerInitializer (Netty Channel初始化)
↓
MqttTransportHandler.channelRegistered()
↓
设备CONNECT消息
↓
MqttTransportHandler.processConnect()
↓
设备认证
├─ 通过TransportApiRequestMsg查询设备
├─ 验证设备凭证(AccessToken等)
└─ 获取设备信息
↓
创建设备会话 (DeviceSessionCtx)
↓
发送CONNACK确认
↓
订阅通知主题 (Kafka: tb.transport.*)
↓
设备连接完成
核心代码模块:
-
MqttTransportService (
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java)- 功能: MQTT传输服务启动和管理
- 关键方法:
init(): 初始化Netty服务器,绑定MQTT端口
@PostConstruct public void init() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(context, false)) .childOption(ChannelOption.SO_KEEPALIVE, keepAlive); serverChannel = b.bind(host, port).sync().channel(); }
-
MqttTransportHandler (
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java)- 功能: 处理MQTT消息的核心Handler
- 关键方法:
channelRead(): Netty Channel读取消息processMqttMsg(): 处理MQTT消息processConnect(): 处理设备连接
3.2 设备数据上报流程(上行)
设备 (MQTT PUBLISH)
主题: v1/devices/me/telemetry 或 v1/devices/me/attributes
↓
MqttTransportHandler.channelRead()
↓
MqttTransportHandler.processMqttMsg()
↓
processDevicePublish() (判断主题类型)
├─ 遥测数据 (isDeviceTelemetryTopic)
│ └─ PostTelemetryMsg
├─ 属性数据 (isDeviceAttributesTopic)
│ └─ PostAttributeMsg
├─ RPC响应 (DEVICE_RPC_RESPONSE_TOPIC)
│ └─ ToDeviceRpcResponseMsg
└─ 其他类型...
↓
MqttTransportAdaptor转换消息格式
├─ JSON格式 → Protobuf
├─ Protobuf格式 → Protobuf
└─ 自定义格式 → Protobuf
↓
DefaultTransportService.process()
↓
构建ToCoreMsg
├─ TransportToDeviceActorMsg (设备消息)
└─ 其他设备事件
↓
发送到Kafka (tb.core.* 主题)
↓
tb-core 接收并处理
核心代码模块:
-
MqttTransportHandler.processDevicePublish() (
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java)- 功能: 处理设备发布的MQTT消息
- 关键逻辑:
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { // 判断主题类型 if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { // 遥测数据 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...); } else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { // 属性数据 TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...); } // ... }
-
MqttTransportAdaptor (
common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java)- 功能: MQTT消息格式转换
- 支持格式:
- JSON格式
- Protobuf格式
- 自定义格式
-
DefaultTransportService.process() (
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java)- 功能: 传输服务统一处理接口
- 关键方法:
process(PostTelemetryMsg): 处理遥测数据process(PostAttributeMsg): 处理属性数据sendToCore(): 发送消息到核心服务
3.3 核心服务消息推送流程(下行)
tb-core 发送消息
↓
Kafka (tb.transport.* 主题)
↓
DefaultTransportService.transportNotificationsConsumer
↓
消费消息 (ToTransportMsg)
├─ ToDeviceRpcRequestMsg: RPC请求
├─ AttributeUpdateNotificationMsg: 属性更新
└─ SessionCloseNotificationProto: 会话关闭
↓
根据nodeId路由到对应的传输服务实例
↓
MqttTransportHandler处理
├─ RPC请求 → 发布到设备订阅的RPC主题
├─ 属性更新 → 发布到设备订阅的属性主题
└─ 会话关闭 → 关闭MQTT连接
↓
设备接收MQTT消息
核心代码模块:
-
DefaultTransportService.init() - 初始化通知消费者
@PostConstruct public void init() { // 创建通知消费者 transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder() .name("TB Transport Notifications") .msgPackProcessor(this::processNotifications) .pollInterval(notificationsPollDuration) .consumerCreator(() -> queueProvider.createTransportNotificationsConsumer()) .consumerExecutor(consumerExecutor) .threadPrefix("transport-notifications") .build(); } -
processNotifications() - 处理通知消息
private void processNotifications(List<TbProtoQueueMsg<ToTransportMsg>> msgs, ...) { msgs.forEach(msg -> { ToTransportMsg toTransportMsg = msg.getValue(); // 根据消息类型处理 if (toTransportMsg.hasToDeviceRpcRequestMsg()) { // RPC请求 processRpcRequest(toTransportMsg.getToDeviceRpcRequestMsg()); } else if (toTransportMsg.hasAttributeUpdateNotificationMsg()) { // 属性更新 processAttributeUpdate(toTransportMsg.getAttributeUpdateNotificationMsg()); } }); } -
MqttTransportHandler发送MQTT消息 - 将消息推送到设备
private void sendRpcRequest(ToDeviceRpcRequestMsg rpcRequest) { // 获取设备订阅的RPC请求主题 String topic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; // 发布MQTT消息 publish(ctx, topic, rpcRequestPayload, MqttQoS.AT_LEAST_ONCE); }
3.4 设备订阅流程
设备 (MQTT SUBSCRIBE)
主题: v1/devices/me/attributes 或 v1/devices/me/rpc/request/+
↓
MqttTransportHandler.processSubscribe()
↓
判断订阅主题类型
├─ 属性订阅 → 订阅属性更新
├─ RPC请求订阅 → 订阅RPC请求
└─ 其他订阅...
↓
保存订阅信息到DeviceSessionCtx
↓
发送SUBACK确认
↓
当有属性更新或RPC请求时,发布到对应主题
4. 流量出口
4.1 Kafka消息出口(上行)
出口点: DefaultTransportService.sendToCore()
Kafka主题: tb.core.*
流量去向:
- tb-core: 核心服务处理设备数据
消息类型:
ToCoreMsg: 包含TransportToDeviceActorMsgPostTelemetryMsg: 遥测数据PostAttributeMsg: 属性数据ToDeviceRpcResponseMsg: RPC响应DeviceConnectProto: 设备连接事件DeviceDisconnectProto: 设备断开事件
4.2 MQTT消息出口(下行)
出口点: MqttTransportHandler.publish()
MQTT主题:
v1/devices/{deviceId}/attributes: 属性更新v1/devices/{deviceId}/rpc/request/{requestId}: RPC请求- 其他设备订阅的主题
流量去向:
- IoT设备: 设备接收MQTT消息
消息类型:
- PUBLISH消息(属性更新、RPC请求等)
5. 核心代码模块详解
5.1 ThingsboardMqttTransportApplication
位置: transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java
功能:
- MQTT传输服务的Spring Boot应用入口
- 组件扫描配置
关键代码:
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common",
"org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue",
"org.thingsboard.server.cache"})
public class ThingsboardMqttTransportApplication {
public static void main(String[] args) {
SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
}
}
5.2 MqttTransportService
位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
功能:
- 启动和管理MQTT服务器(基于Netty)
- 处理SSL连接(可选)
- 管理Netty EventLoopGroup
关键方法:
-
init(): 初始化MQTT服务器
- 创建Netty ServerBootstrap
- 绑定MQTT端口(默认1883)
- 配置Channel Pipeline
-
shutdown(): 关闭MQTT服务器
- 关闭Netty Channel
- 关闭EventLoopGroup
5.3 MqttTransportHandler
位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java
功能:
- 处理所有MQTT消息的核心Handler
- 管理设备会话
- 消息格式转换和路由
关键方法:
-
channelRead(): Netty Channel读取消息
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof MqttMessage) { MqttMessage message = (MqttMessage) msg; if (message.decoderResult().isSuccess()) { processMqttMsg(ctx, message); } } } -
processMqttMsg(): 处理MQTT消息
- 根据消息类型(CONNECT、PUBLISH、SUBSCRIBE等)分发处理
-
processConnect(): 处理设备连接
- 设备认证
- 创建设备会话
- 发送CONNACK
-
processDevicePublish(): 处理设备发布消息
- 判断主题类型
- 转换消息格式
- 调用transportService.process()
-
processSubscribe(): 处理设备订阅
- 保存订阅信息
- 发送SUBACK
5.4 DeviceSessionCtx
位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/device/MqttDeviceSessionCtx.java
功能:
- 管理设备MQTT会话信息
- 存储设备ID、会话ID等
- 管理订阅的主题
- 缓存设备配置
关键属性:
sessionId: 会话IDdeviceId: 设备IDtenantId: 租户IDsubscriptions: 订阅的主题列表
5.5 DefaultTransportService
位置: common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
功能:
- 传输服务的统一接口实现
- 处理所有类型的设备消息
- 发送消息到核心服务
- 接收核心服务消息并推送到设备
关键方法:
-
process(PostTelemetryMsg): 处理遥测数据
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) { // 构建TransportToDeviceActorMsg TransportToDeviceActorMsg.Builder builder = TransportToDeviceActorMsg.newBuilder() .setSessionInfo(sessionInfo) .setPostTelemetry(msg); // 发送到核心服务 sendToCore(tenantId, deviceId, builder.build(), ...); } -
sendToCore(): 发送消息到核心服务
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, ...) { // 解析目标分区 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); // 发送到Kafka tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback); } -
processNotifications(): 处理来自核心服务的通知
- 消费Kafka通知消息
- 路由到对应的设备会话
- 通过MQTT推送到设备
6. 数据流向图
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT协议 (端口1883) │
│ ├─ 设备连接 (CONNECT) │
│ ├─ 设备发布 (PUBLISH) │
│ │ ├─ 遥测数据: v1/devices/me/telemetry │
│ │ ├─ 属性数据: v1/devices/me/attributes │
│ │ └─ RPC响应: v1/devices/me/rpc/response/+ │
│ └─ 设备订阅 (SUBSCRIBE) │
│ │
│ 2. Kafka通知 (tb.transport.* 主题) │
│ └─ tb-core 推送的消息 │
│ ├─ RPC请求 │
│ ├─ 属性更新通知 │
│ └─ 会话关闭通知 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT消息处理 │
│ MqttTransportHandler │
│ ├─ 设备连接: processConnect() │
│ │ ├─ 设备认证 │
│ │ └─ 创建会话 (DeviceSessionCtx) │
│ ├─ 数据上报: processDevicePublish() │
│ │ ├─ 判断主题类型 │
│ │ ├─ 格式转换 (MqttTransportAdaptor) │
│ │ └─ 调用 transportService.process() │
│ └─ 设备订阅: processSubscribe() │
│ │
│ 2. 传输服务处理 │
│ DefaultTransportService │
│ ├─ process(PostTelemetryMsg) │
│ ├─ process(PostAttributeMsg) │
│ └─ sendToCore() → Kafka (tb.core.*) │
│ │
│ 3. 通知消息处理 │
│ DefaultTransportService.processNotifications() │
│ ├─ 消费Kafka通知消息 │
│ ├─ 路由到设备会话 │
│ └─ 通过MQTT推送到设备 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 (tb.core.* 主题) │
│ → tb-core: 设备数据、事件等 │
│ │
│ 2. MQTT消息 (推送到设备) │
│ → IoT设备: 属性更新、RPC请求等 │
└─────────────────────────────────────────────────────────────────┘
7. 关键配置
7.1 环境变量配置
ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址TB_QUEUE_TYPE=kafka: 队列类型TB_KAFKA_SERVERS=kafka:9092: Kafka地址MQTT_BIND_ADDRESS=0.0.0.0: MQTT绑定地址MQTT_BIND_PORT=1883: MQTT端口
7.2 核心配置项
transport.mqtt.bind_address: MQTT绑定地址transport.mqtt.bind_port: MQTT端口transport.mqtt.timeout: MQTT超时时间transport.mqtt.netty.boss_group_thread_count: Netty Boss线程数transport.mqtt.netty.worker_group_thread_count: Netty Worker线程数
8. 支持的MQTT主题
8.1 设备遥测数据主题
v1/devices/me/telemetry: 标准遥测数据主题telemetry: 短主题(简化模式)
8.2 设备属性主题
v1/devices/me/attributes: 客户端属性v1/devices/me/attributes/request/+: 请求共享属性
8.3 RPC主题
v1/devices/me/rpc/request/+: 订阅RPC请求v1/devices/me/rpc/response/+: 发送RPC响应
9. 总结
tb-mqtt-transport 是 ThingsBoard 的 MQTT 传输层服务,负责:
- MQTT服务器: 基于Netty实现高性能MQTT服务器
- 设备连接管理: 设备认证、会话管理
- 消息转换: 支持多种消息格式(JSON、Protobuf等)
- 上行数据流: 设备数据 → MQTT → Kafka → tb-core
- 下行数据流: tb-core → Kafka → MQTT → 设备
- 双向通信: 支持设备上报和服务器推送
整个流程采用了异步非阻塞的Netty架构,支持大量并发连接,通过Kafka实现与核心服务的解耦通信。