thingsboard/summary/镜像流量分析/03-tb-mqtt-transport流量分析.md

575 lines
20 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tb-mqtt-transport 流量分析
## 1. 镜像概述
**tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输服务,负责:
- 接收设备通过 MQTT 协议发送的消息
- 设备认证和会话管理
- 消息格式转换JSON、Protobuf等
- 将设备消息发送到核心服务通过Kafka
- 接收核心服务消息并推送到设备
**镜像名称**: `thingsboard/tb-mqtt-transport`
**源码位置**:
- 主应用: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/`
- 传输处理: `common/transport/mqtt/`
**端口**: `1883` (MQTT)
---
## 2. 流量入口
### 2.1 MQTT 协议入口
**入口点**:
- `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`
- `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`
**端口**: `1883` (MQTT协议)
**流量来源**:
- **IoT设备**: 通过MQTT协议连接并发送数据
- **MQTT客户端**: 各种MQTT客户端库
- **网关设备**: MQTT网关转发设备数据
**MQTT消息类型**:
- `CONNECT`: 设备连接请求
- `PUBLISH`: 设备发布消息遥测、属性、RPC响应等
- `SUBSCRIBE`: 设备订阅主题属性更新、RPC请求等
- `DISCONNECT`: 设备断开连接
### 2.2 Kafka 消息入口(下行)
**入口点**:
- `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**Kafka主题**: `tb.transport.*` (通知主题)
**流量来源**:
- **tb-core**: 核心服务推送的消息RPC请求、属性更新通知等
**消息类型**:
- `ToTransportMsg`: 传输层消息
- `ToDeviceRpcRequestMsg`: RPC请求
- `AttributeUpdateNotificationMsg`: 属性更新通知
- `SessionCloseNotificationProto`: 会话关闭通知
---
## 3. 流量处理流程
### 3.1 设备连接流程(上行)
```
设备 (MQTT CONNECT)
MqttTransportServerInitializer (Netty Channel初始化)
MqttTransportHandler.channelRegistered()
设备CONNECT消息
MqttTransportHandler.processConnect()
设备认证
├─ 通过TransportApiRequestMsg查询设备
├─ 验证设备凭证AccessToken等
└─ 获取设备信息
创建设备会话 (DeviceSessionCtx)
发送CONNACK确认
订阅通知主题 (Kafka: tb.transport.*)
设备连接完成
```
**核心代码模块**:
1. **MqttTransportService** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`)
- **功能**: MQTT传输服务启动和管理
- **关键方法**:
- `init()`: 初始化Netty服务器绑定MQTT端口
```java
@PostConstruct
public void init() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context, false))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
}
```
2. **MqttTransportHandler** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`)
- **功能**: 处理MQTT消息的核心Handler
- **关键方法**:
- `channelRead()`: Netty Channel读取消息
- `processMqttMsg()`: 处理MQTT消息
- `processConnect()`: 处理设备连接
### 3.2 设备数据上报流程(上行)
```
设备 (MQTT PUBLISH)
主题: v1/devices/me/telemetry 或 v1/devices/me/attributes
MqttTransportHandler.channelRead()
MqttTransportHandler.processMqttMsg()
processDevicePublish() (判断主题类型)
├─ 遥测数据 (isDeviceTelemetryTopic)
│ └─ PostTelemetryMsg
├─ 属性数据 (isDeviceAttributesTopic)
│ └─ PostAttributeMsg
├─ RPC响应 (DEVICE_RPC_RESPONSE_TOPIC)
│ └─ ToDeviceRpcResponseMsg
└─ 其他类型...
MqttTransportAdaptor转换消息格式
├─ JSON格式 → Protobuf
├─ Protobuf格式 → Protobuf
└─ 自定义格式 → Protobuf
DefaultTransportService.process()
构建ToCoreMsg
├─ TransportToDeviceActorMsg (设备消息)
└─ 其他设备事件
发送到Kafka (tb.core.* 主题)
tb-core 接收并处理
```
**核心代码模块**:
1. **MqttTransportHandler.processDevicePublish()** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`)
- **功能**: 处理设备发布的MQTT消息
- **关键逻辑**:
```java
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
// 判断主题类型
if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
// 遥测数据
TransportProtos.PostTelemetryMsg postTelemetryMsg =
payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...);
} else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
// 属性数据
TransportProtos.PostAttributeMsg postAttributeMsg =
payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...);
}
// ...
}
```
2. **MqttTransportAdaptor** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java`)
- **功能**: MQTT消息格式转换
- **支持格式**:
- JSON格式
- Protobuf格式
- 自定义格式
3. **DefaultTransportService.process()** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`)
- **功能**: 传输服务统一处理接口
- **关键方法**:
- `process(PostTelemetryMsg)`: 处理遥测数据
- `process(PostAttributeMsg)`: 处理属性数据
- `sendToCore()`: 发送消息到核心服务
### 3.3 核心服务消息推送流程(下行)
```
tb-core 发送消息
Kafka (tb.transport.* 主题)
DefaultTransportService.transportNotificationsConsumer
消费消息 (ToTransportMsg)
├─ ToDeviceRpcRequestMsg: RPC请求
├─ AttributeUpdateNotificationMsg: 属性更新
└─ SessionCloseNotificationProto: 会话关闭
根据nodeId路由到对应的传输服务实例
MqttTransportHandler处理
├─ RPC请求 → 发布到设备订阅的RPC主题
├─ 属性更新 → 发布到设备订阅的属性主题
└─ 会话关闭 → 关闭MQTT连接
设备接收MQTT消息
```
**核心代码模块**:
1. **DefaultTransportService.init()** - 初始化通知消费者
```java
@PostConstruct
public void init() {
// 创建通知消费者
transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
.name("TB Transport Notifications")
.msgPackProcessor(this::processNotifications)
.pollInterval(notificationsPollDuration)
.consumerCreator(() -> queueProvider.createTransportNotificationsConsumer())
.consumerExecutor(consumerExecutor)
.threadPrefix("transport-notifications")
.build();
}
```
2. **processNotifications()** - 处理通知消息
```java
private void processNotifications(List<TbProtoQueueMsg<ToTransportMsg>> msgs, ...) {
msgs.forEach(msg -> {
ToTransportMsg toTransportMsg = msg.getValue();
// 根据消息类型处理
if (toTransportMsg.hasToDeviceRpcRequestMsg()) {
// RPC请求
processRpcRequest(toTransportMsg.getToDeviceRpcRequestMsg());
} else if (toTransportMsg.hasAttributeUpdateNotificationMsg()) {
// 属性更新
processAttributeUpdate(toTransportMsg.getAttributeUpdateNotificationMsg());
}
});
}
```
3. **MqttTransportHandler发送MQTT消息** - 将消息推送到设备
```java
private void sendRpcRequest(ToDeviceRpcRequestMsg rpcRequest) {
// 获取设备订阅的RPC请求主题
String topic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
// 发布MQTT消息
publish(ctx, topic, rpcRequestPayload, MqttQoS.AT_LEAST_ONCE);
}
```
### 3.4 设备订阅流程
```
设备 (MQTT SUBSCRIBE)
主题: v1/devices/me/attributes 或 v1/devices/me/rpc/request/+
MqttTransportHandler.processSubscribe()
判断订阅主题类型
├─ 属性订阅 → 订阅属性更新
├─ RPC请求订阅 → 订阅RPC请求
└─ 其他订阅...
保存订阅信息到DeviceSessionCtx
发送SUBACK确认
当有属性更新或RPC请求时发布到对应主题
```
---
## 4. 流量出口
### 4.1 Kafka消息出口上行
**出口点**: `DefaultTransportService.sendToCore()`
**Kafka主题**: `tb.core.*`
**流量去向**:
- **tb-core**: 核心服务处理设备数据
**消息类型**:
- `ToCoreMsg`: 包含TransportToDeviceActorMsg
- `PostTelemetryMsg`: 遥测数据
- `PostAttributeMsg`: 属性数据
- `ToDeviceRpcResponseMsg`: RPC响应
- `DeviceConnectProto`: 设备连接事件
- `DeviceDisconnectProto`: 设备断开事件
### 4.2 MQTT消息出口下行
**出口点**: `MqttTransportHandler.publish()`
**MQTT主题**:
- `v1/devices/{deviceId}/attributes`: 属性更新
- `v1/devices/{deviceId}/rpc/request/{requestId}`: RPC请求
- 其他设备订阅的主题
**流量去向**:
- **IoT设备**: 设备接收MQTT消息
**消息类型**:
- PUBLISH消息属性更新、RPC请求等
---
## 5. 核心代码模块详解
### 5.1 ThingsboardMqttTransportApplication
**位置**: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java`
**功能**:
- MQTT传输服务的Spring Boot应用入口
- 组件扫描配置
**关键代码**:
```java
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common",
"org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue",
"org.thingsboard.server.cache"})
public class ThingsboardMqttTransportApplication {
public static void main(String[] args) {
SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
}
}
```
### 5.2 MqttTransportService
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`
**功能**:
- 启动和管理MQTT服务器基于Netty
- 处理SSL连接可选
- 管理Netty EventLoopGroup
**关键方法**:
1. **init()**: 初始化MQTT服务器
- 创建Netty ServerBootstrap
- 绑定MQTT端口默认1883
- 配置Channel Pipeline
2. **shutdown()**: 关闭MQTT服务器
- 关闭Netty Channel
- 关闭EventLoopGroup
### 5.3 MqttTransportHandler
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`
**功能**:
- 处理所有MQTT消息的核心Handler
- 管理设备会话
- 消息格式转换和路由
**关键方法**:
1. **channelRead()**: Netty Channel读取消息
```java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof MqttMessage) {
MqttMessage message = (MqttMessage) msg;
if (message.decoderResult().isSuccess()) {
processMqttMsg(ctx, message);
}
}
}
```
2. **processMqttMsg()**: 处理MQTT消息
- 根据消息类型CONNECT、PUBLISH、SUBSCRIBE等分发处理
3. **processConnect()**: 处理设备连接
- 设备认证
- 创建设备会话
- 发送CONNACK
4. **processDevicePublish()**: 处理设备发布消息
- 判断主题类型
- 转换消息格式
- 调用transportService.process()
5. **processSubscribe()**: 处理设备订阅
- 保存订阅信息
- 发送SUBACK
### 5.4 DeviceSessionCtx
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/device/MqttDeviceSessionCtx.java`
**功能**:
- 管理设备MQTT会话信息
- 存储设备ID、会话ID等
- 管理订阅的主题
- 缓存设备配置
**关键属性**:
- `sessionId`: 会话ID
- `deviceId`: 设备ID
- `tenantId`: 租户ID
- `subscriptions`: 订阅的主题列表
### 5.5 DefaultTransportService
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**功能**:
- 传输服务的统一接口实现
- 处理所有类型的设备消息
- 发送消息到核心服务
- 接收核心服务消息并推送到设备
**关键方法**:
1. **process(PostTelemetryMsg)**: 处理遥测数据
```java
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) {
// 构建TransportToDeviceActorMsg
TransportToDeviceActorMsg.Builder builder = TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(sessionInfo)
.setPostTelemetry(msg);
// 发送到核心服务
sendToCore(tenantId, deviceId, builder.build(), ...);
}
```
2. **sendToCore()**: 发送消息到核心服务
```java
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, ...) {
// 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
// 发送到Kafka
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback);
}
```
3. **processNotifications()**: 处理来自核心服务的通知
- 消费Kafka通知消息
- 路由到对应的设备会话
- 通过MQTT推送到设备
---
## 6. 数据流向图
```
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT协议 (端口1883) │
│ ├─ 设备连接 (CONNECT) │
│ ├─ 设备发布 (PUBLISH) │
│ │ ├─ 遥测数据: v1/devices/me/telemetry │
│ │ ├─ 属性数据: v1/devices/me/attributes │
│ │ └─ RPC响应: v1/devices/me/rpc/response/+ │
│ └─ 设备订阅 (SUBSCRIBE) │
│ │
│ 2. Kafka通知 (tb.transport.* 主题) │
│ └─ tb-core 推送的消息 │
│ ├─ RPC请求 │
│ ├─ 属性更新通知 │
│ └─ 会话关闭通知 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT消息处理 │
│ MqttTransportHandler │
│ ├─ 设备连接: processConnect() │
│ │ ├─ 设备认证 │
│ │ └─ 创建会话 (DeviceSessionCtx) │
│ ├─ 数据上报: processDevicePublish() │
│ │ ├─ 判断主题类型 │
│ │ ├─ 格式转换 (MqttTransportAdaptor) │
│ │ └─ 调用 transportService.process() │
│ └─ 设备订阅: processSubscribe() │
│ │
│ 2. 传输服务处理 │
│ DefaultTransportService │
│ ├─ process(PostTelemetryMsg) │
│ ├─ process(PostAttributeMsg) │
│ └─ sendToCore() → Kafka (tb.core.*) │
│ │
│ 3. 通知消息处理 │
│ DefaultTransportService.processNotifications() │
│ ├─ 消费Kafka通知消息 │
│ ├─ 路由到设备会话 │
│ └─ 通过MQTT推送到设备 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 (tb.core.* 主题) │
│ → tb-core: 设备数据、事件等 │
│ │
│ 2. MQTT消息 (推送到设备) │
│ → IoT设备: 属性更新、RPC请求等 │
└─────────────────────────────────────────────────────────────────┘
```
---
## 7. 关键配置
### 7.1 环境变量配置
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
- `TB_QUEUE_TYPE=kafka`: 队列类型
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
- `MQTT_BIND_ADDRESS=0.0.0.0`: MQTT绑定地址
- `MQTT_BIND_PORT=1883`: MQTT端口
### 7.2 核心配置项
- `transport.mqtt.bind_address`: MQTT绑定地址
- `transport.mqtt.bind_port`: MQTT端口
- `transport.mqtt.timeout`: MQTT超时时间
- `transport.mqtt.netty.boss_group_thread_count`: Netty Boss线程数
- `transport.mqtt.netty.worker_group_thread_count`: Netty Worker线程数
---
## 8. 支持的MQTT主题
### 8.1 设备遥测数据主题
- `v1/devices/me/telemetry`: 标准遥测数据主题
- `telemetry`: 短主题(简化模式)
### 8.2 设备属性主题
- `v1/devices/me/attributes`: 客户端属性
- `v1/devices/me/attributes/request/+`: 请求共享属性
### 8.3 RPC主题
- `v1/devices/me/rpc/request/+`: 订阅RPC请求
- `v1/devices/me/rpc/response/+`: 发送RPC响应
---
## 9. 总结
**tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输层服务,负责:
1. **MQTT服务器**: 基于Netty实现高性能MQTT服务器
2. **设备连接管理**: 设备认证、会话管理
3. **消息转换**: 支持多种消息格式JSON、Protobuf等
4. **上行数据流**: 设备数据 → MQTT → Kafka → tb-core
5. **下行数据流**: tb-core → Kafka → MQTT → 设备
6. **双向通信**: 支持设备上报和服务器推送
整个流程采用了**异步非阻塞**的Netty架构支持大量并发连接通过**Kafka**实现与核心服务的解耦通信。