thingsboard/summary/镜像流量分析/03-tb-mqtt-transport流量分析.md

575 lines
20 KiB
Markdown
Raw Permalink Normal View History

2026-01-19 11:50:37 +08:00
# tb-mqtt-transport 流量分析
## 1. 镜像概述
**tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输服务,负责:
- 接收设备通过 MQTT 协议发送的消息
- 设备认证和会话管理
- 消息格式转换JSON、Protobuf等
- 将设备消息发送到核心服务通过Kafka
- 接收核心服务消息并推送到设备
**镜像名称**: `thingsboard/tb-mqtt-transport`
**源码位置**:
- 主应用: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/`
- 传输处理: `common/transport/mqtt/`
**端口**: `1883` (MQTT)
---
## 2. 流量入口
### 2.1 MQTT 协议入口
**入口点**:
- `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`
- `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`
**端口**: `1883` (MQTT协议)
**流量来源**:
- **IoT设备**: 通过MQTT协议连接并发送数据
- **MQTT客户端**: 各种MQTT客户端库
- **网关设备**: MQTT网关转发设备数据
**MQTT消息类型**:
- `CONNECT`: 设备连接请求
- `PUBLISH`: 设备发布消息遥测、属性、RPC响应等
- `SUBSCRIBE`: 设备订阅主题属性更新、RPC请求等
- `DISCONNECT`: 设备断开连接
### 2.2 Kafka 消息入口(下行)
**入口点**:
- `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**Kafka主题**: `tb.transport.*` (通知主题)
**流量来源**:
- **tb-core**: 核心服务推送的消息RPC请求、属性更新通知等
**消息类型**:
- `ToTransportMsg`: 传输层消息
- `ToDeviceRpcRequestMsg`: RPC请求
- `AttributeUpdateNotificationMsg`: 属性更新通知
- `SessionCloseNotificationProto`: 会话关闭通知
---
## 3. 流量处理流程
### 3.1 设备连接流程(上行)
```
设备 (MQTT CONNECT)
MqttTransportServerInitializer (Netty Channel初始化)
MqttTransportHandler.channelRegistered()
设备CONNECT消息
MqttTransportHandler.processConnect()
设备认证
├─ 通过TransportApiRequestMsg查询设备
├─ 验证设备凭证AccessToken等
└─ 获取设备信息
创建设备会话 (DeviceSessionCtx)
发送CONNACK确认
订阅通知主题 (Kafka: tb.transport.*)
设备连接完成
```
**核心代码模块**:
1. **MqttTransportService** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`)
- **功能**: MQTT传输服务启动和管理
- **关键方法**:
- `init()`: 初始化Netty服务器绑定MQTT端口
```java
@PostConstruct
public void init() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context, false))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
}
```
2. **MqttTransportHandler** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`)
- **功能**: 处理MQTT消息的核心Handler
- **关键方法**:
- `channelRead()`: Netty Channel读取消息
- `processMqttMsg()`: 处理MQTT消息
- `processConnect()`: 处理设备连接
### 3.2 设备数据上报流程(上行)
```
设备 (MQTT PUBLISH)
主题: v1/devices/me/telemetry 或 v1/devices/me/attributes
MqttTransportHandler.channelRead()
MqttTransportHandler.processMqttMsg()
processDevicePublish() (判断主题类型)
├─ 遥测数据 (isDeviceTelemetryTopic)
│ └─ PostTelemetryMsg
├─ 属性数据 (isDeviceAttributesTopic)
│ └─ PostAttributeMsg
├─ RPC响应 (DEVICE_RPC_RESPONSE_TOPIC)
│ └─ ToDeviceRpcResponseMsg
└─ 其他类型...
MqttTransportAdaptor转换消息格式
├─ JSON格式 → Protobuf
├─ Protobuf格式 → Protobuf
└─ 自定义格式 → Protobuf
DefaultTransportService.process()
构建ToCoreMsg
├─ TransportToDeviceActorMsg (设备消息)
└─ 其他设备事件
发送到Kafka (tb.core.* 主题)
tb-core 接收并处理
```
**核心代码模块**:
1. **MqttTransportHandler.processDevicePublish()** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`)
- **功能**: 处理设备发布的MQTT消息
- **关键逻辑**:
```java
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
// 判断主题类型
if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
// 遥测数据
TransportProtos.PostTelemetryMsg postTelemetryMsg =
payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...);
} else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
// 属性数据
TransportProtos.PostAttributeMsg postAttributeMsg =
payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...);
}
// ...
}
```
2. **MqttTransportAdaptor** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java`)
- **功能**: MQTT消息格式转换
- **支持格式**:
- JSON格式
- Protobuf格式
- 自定义格式
3. **DefaultTransportService.process()** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`)
- **功能**: 传输服务统一处理接口
- **关键方法**:
- `process(PostTelemetryMsg)`: 处理遥测数据
- `process(PostAttributeMsg)`: 处理属性数据
- `sendToCore()`: 发送消息到核心服务
### 3.3 核心服务消息推送流程(下行)
```
tb-core 发送消息
Kafka (tb.transport.* 主题)
DefaultTransportService.transportNotificationsConsumer
消费消息 (ToTransportMsg)
├─ ToDeviceRpcRequestMsg: RPC请求
├─ AttributeUpdateNotificationMsg: 属性更新
└─ SessionCloseNotificationProto: 会话关闭
根据nodeId路由到对应的传输服务实例
MqttTransportHandler处理
├─ RPC请求 → 发布到设备订阅的RPC主题
├─ 属性更新 → 发布到设备订阅的属性主题
└─ 会话关闭 → 关闭MQTT连接
设备接收MQTT消息
```
**核心代码模块**:
1. **DefaultTransportService.init()** - 初始化通知消费者
```java
@PostConstruct
public void init() {
// 创建通知消费者
transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
.name("TB Transport Notifications")
.msgPackProcessor(this::processNotifications)
.pollInterval(notificationsPollDuration)
.consumerCreator(() -> queueProvider.createTransportNotificationsConsumer())
.consumerExecutor(consumerExecutor)
.threadPrefix("transport-notifications")
.build();
}
```
2. **processNotifications()** - 处理通知消息
```java
private void processNotifications(List<TbProtoQueueMsg<ToTransportMsg>> msgs, ...) {
msgs.forEach(msg -> {
ToTransportMsg toTransportMsg = msg.getValue();
// 根据消息类型处理
if (toTransportMsg.hasToDeviceRpcRequestMsg()) {
// RPC请求
processRpcRequest(toTransportMsg.getToDeviceRpcRequestMsg());
} else if (toTransportMsg.hasAttributeUpdateNotificationMsg()) {
// 属性更新
processAttributeUpdate(toTransportMsg.getAttributeUpdateNotificationMsg());
}
});
}
```
3. **MqttTransportHandler发送MQTT消息** - 将消息推送到设备
```java
private void sendRpcRequest(ToDeviceRpcRequestMsg rpcRequest) {
// 获取设备订阅的RPC请求主题
String topic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
// 发布MQTT消息
publish(ctx, topic, rpcRequestPayload, MqttQoS.AT_LEAST_ONCE);
}
```
### 3.4 设备订阅流程
```
设备 (MQTT SUBSCRIBE)
主题: v1/devices/me/attributes 或 v1/devices/me/rpc/request/+
MqttTransportHandler.processSubscribe()
判断订阅主题类型
├─ 属性订阅 → 订阅属性更新
├─ RPC请求订阅 → 订阅RPC请求
└─ 其他订阅...
保存订阅信息到DeviceSessionCtx
发送SUBACK确认
当有属性更新或RPC请求时发布到对应主题
```
---
## 4. 流量出口
### 4.1 Kafka消息出口上行
**出口点**: `DefaultTransportService.sendToCore()`
**Kafka主题**: `tb.core.*`
**流量去向**:
- **tb-core**: 核心服务处理设备数据
**消息类型**:
- `ToCoreMsg`: 包含TransportToDeviceActorMsg
- `PostTelemetryMsg`: 遥测数据
- `PostAttributeMsg`: 属性数据
- `ToDeviceRpcResponseMsg`: RPC响应
- `DeviceConnectProto`: 设备连接事件
- `DeviceDisconnectProto`: 设备断开事件
### 4.2 MQTT消息出口下行
**出口点**: `MqttTransportHandler.publish()`
**MQTT主题**:
- `v1/devices/{deviceId}/attributes`: 属性更新
- `v1/devices/{deviceId}/rpc/request/{requestId}`: RPC请求
- 其他设备订阅的主题
**流量去向**:
- **IoT设备**: 设备接收MQTT消息
**消息类型**:
- PUBLISH消息属性更新、RPC请求等
---
## 5. 核心代码模块详解
### 5.1 ThingsboardMqttTransportApplication
**位置**: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java`
**功能**:
- MQTT传输服务的Spring Boot应用入口
- 组件扫描配置
**关键代码**:
```java
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common",
"org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue",
"org.thingsboard.server.cache"})
public class ThingsboardMqttTransportApplication {
public static void main(String[] args) {
SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
}
}
```
### 5.2 MqttTransportService
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`
**功能**:
- 启动和管理MQTT服务器基于Netty
- 处理SSL连接可选
- 管理Netty EventLoopGroup
**关键方法**:
1. **init()**: 初始化MQTT服务器
- 创建Netty ServerBootstrap
- 绑定MQTT端口默认1883
- 配置Channel Pipeline
2. **shutdown()**: 关闭MQTT服务器
- 关闭Netty Channel
- 关闭EventLoopGroup
### 5.3 MqttTransportHandler
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`
**功能**:
- 处理所有MQTT消息的核心Handler
- 管理设备会话
- 消息格式转换和路由
**关键方法**:
1. **channelRead()**: Netty Channel读取消息
```java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof MqttMessage) {
MqttMessage message = (MqttMessage) msg;
if (message.decoderResult().isSuccess()) {
processMqttMsg(ctx, message);
}
}
}
```
2. **processMqttMsg()**: 处理MQTT消息
- 根据消息类型CONNECT、PUBLISH、SUBSCRIBE等分发处理
3. **processConnect()**: 处理设备连接
- 设备认证
- 创建设备会话
- 发送CONNACK
4. **processDevicePublish()**: 处理设备发布消息
- 判断主题类型
- 转换消息格式
- 调用transportService.process()
5. **processSubscribe()**: 处理设备订阅
- 保存订阅信息
- 发送SUBACK
### 5.4 DeviceSessionCtx
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/device/MqttDeviceSessionCtx.java`
**功能**:
- 管理设备MQTT会话信息
- 存储设备ID、会话ID等
- 管理订阅的主题
- 缓存设备配置
**关键属性**:
- `sessionId`: 会话ID
- `deviceId`: 设备ID
- `tenantId`: 租户ID
- `subscriptions`: 订阅的主题列表
### 5.5 DefaultTransportService
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**功能**:
- 传输服务的统一接口实现
- 处理所有类型的设备消息
- 发送消息到核心服务
- 接收核心服务消息并推送到设备
**关键方法**:
1. **process(PostTelemetryMsg)**: 处理遥测数据
```java
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) {
// 构建TransportToDeviceActorMsg
TransportToDeviceActorMsg.Builder builder = TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo)
.setPostTelemetry(msg);
// 发送到核心服务
sendToCore(tenantId, deviceId, builder.build(), ...);
}
```
2. **sendToCore()**: 发送消息到核心服务
```java
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, ...) {
// 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
// 发送到Kafka
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback);
}
```
3. **processNotifications()**: 处理来自核心服务的通知
- 消费Kafka通知消息
- 路由到对应的设备会话
- 通过MQTT推送到设备
---
## 6. 数据流向图
```
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT协议 (端口1883) │
│ ├─ 设备连接 (CONNECT) │
│ ├─ 设备发布 (PUBLISH) │
│ │ ├─ 遥测数据: v1/devices/me/telemetry │
│ │ ├─ 属性数据: v1/devices/me/attributes │
│ │ └─ RPC响应: v1/devices/me/rpc/response/+ │
│ └─ 设备订阅 (SUBSCRIBE) │
│ │
│ 2. Kafka通知 (tb.transport.* 主题) │
│ └─ tb-core 推送的消息 │
│ ├─ RPC请求 │
│ ├─ 属性更新通知 │
│ └─ 会话关闭通知 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT消息处理 │
│ MqttTransportHandler │
│ ├─ 设备连接: processConnect() │
│ │ ├─ 设备认证 │
│ │ └─ 创建会话 (DeviceSessionCtx) │
│ ├─ 数据上报: processDevicePublish() │
│ │ ├─ 判断主题类型 │
│ │ ├─ 格式转换 (MqttTransportAdaptor) │
│ │ └─ 调用 transportService.process() │
│ └─ 设备订阅: processSubscribe() │
│ │
│ 2. 传输服务处理 │
│ DefaultTransportService │
│ ├─ process(PostTelemetryMsg) │
│ ├─ process(PostAttributeMsg) │
│ └─ sendToCore() → Kafka (tb.core.*) │
│ │
│ 3. 通知消息处理 │
│ DefaultTransportService.processNotifications() │
│ ├─ 消费Kafka通知消息 │
│ ├─ 路由到设备会话 │
│ └─ 通过MQTT推送到设备 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 (tb.core.* 主题) │
│ → tb-core: 设备数据、事件等 │
│ │
│ 2. MQTT消息 (推送到设备) │
│ → IoT设备: 属性更新、RPC请求等 │
└─────────────────────────────────────────────────────────────────┘
```
---
## 7. 关键配置
### 7.1 环境变量配置
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
- `TB_QUEUE_TYPE=kafka`: 队列类型
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
- `MQTT_BIND_ADDRESS=0.0.0.0`: MQTT绑定地址
- `MQTT_BIND_PORT=1883`: MQTT端口
### 7.2 核心配置项
- `transport.mqtt.bind_address`: MQTT绑定地址
- `transport.mqtt.bind_port`: MQTT端口
- `transport.mqtt.timeout`: MQTT超时时间
- `transport.mqtt.netty.boss_group_thread_count`: Netty Boss线程数
- `transport.mqtt.netty.worker_group_thread_count`: Netty Worker线程数
---
## 8. 支持的MQTT主题
### 8.1 设备遥测数据主题
- `v1/devices/me/telemetry`: 标准遥测数据主题
- `telemetry`: 短主题(简化模式)
### 8.2 设备属性主题
- `v1/devices/me/attributes`: 客户端属性
- `v1/devices/me/attributes/request/+`: 请求共享属性
### 8.3 RPC主题
- `v1/devices/me/rpc/request/+`: 订阅RPC请求
- `v1/devices/me/rpc/response/+`: 发送RPC响应
---
## 9. 总结
**tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输层服务,负责:
1. **MQTT服务器**: 基于Netty实现高性能MQTT服务器
2. **设备连接管理**: 设备认证、会话管理
3. **消息转换**: 支持多种消息格式JSON、Protobuf等
4. **上行数据流**: 设备数据 → MQTT → Kafka → tb-core
5. **下行数据流**: tb-core → Kafka → MQTT → 设备
6. **双向通信**: 支持设备上报和服务器推送
整个流程采用了**异步非阻塞**的Netty架构支持大量并发连接通过**Kafka**实现与核心服务的解耦通信。