thingsboard/summary/02-模块交互与通信机制.md

387 lines
12 KiB
Markdown
Raw Permalink Normal View History

2026-01-19 11:50:37 +08:00
# ThingsBoard 模块交互与通信机制分析
## 1. 概述
ThingsBoard 采用基于消息队列的异步通信机制,实现模块间的解耦和高可用性。核心通信协议使用 Protobuf 进行消息序列化,通过 Kafka、RabbitMQ 等消息队列系统进行消息传递。
## 2. 通信架构
### 2.1 消息队列架构
ThingsBoard 使用消息队列作为模块间通信的中间件,支持以下队列系统:
- **Kafka**: 高吞吐量分布式消息队列(推荐生产环境)
- **RabbitMQ**: 企业级消息队列
- **AWS SQS**: Amazon 云消息队列
- **Google Pub/Sub**: Google 云消息队列
- **Azure Service Bus**: Azure 云消息队列
- **In-Memory**: 内存队列(仅用于测试)
### 2.2 消息类型
根据消息的目标服务ThingsBoard 定义了以下主要消息类型:
1. **ToCoreMsg**: 发送到核心服务的消息
2. **ToCoreNotificationMsg**: 发送到核心服务的高优先级通知消息
3. **ToRuleEngineMsg**: 发送到规则引擎的消息
4. **ToRuleEngineNotificationMsg**: 发送到规则引擎的通知消息
5. **ToTransportMsg**: 发送到传输层的消息
6. **ToTransportNotificationMsg**: 发送到传输层的通知消息
### 2.3 服务类型
ThingsBoard 定义了以下服务类型ServiceType
- **TB_CORE**: 核心服务
- **TB_RULE_ENGINE**: 规则引擎服务
- **TB_TRANSPORT**: 传输服务
- **TB_EDQS**: 事件驱动查询服务
## 3. 数据流分析
### 3.1 设备遥测数据流
设备遥测数据的完整流程如下:
```
设备 -> 传输层 -> 消息队列 -> 核心服务 -> 规则引擎 -> 数据存储
-> WebSocket -> 前端UI
```
#### 3.1.1 传输层处理
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**关键方法**: `process(PostTelemetryMsg)`
```java
// 处理设备遥测数据
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) {
// 1. 验证数据点限制
if (checkLimits(sessionInfo, msg, callback, dataPoints)) {
// 2. 记录设备活动
recordActivityInternal(sessionInfo);
// 3. 提取租户ID和设备ID
TenantId tenantId = getTenantId(sessionInfo);
DeviceId deviceId = new DeviceId(...);
// 4. 构建消息并发送到规则引擎
for (TsKvListProto tsKv : msg.getTsKvListList()) {
TbMsg tbMsg = TbMsg.newMsg()
.queueName(queueName)
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(deviceId)
.data(json)
.ruleChainId(ruleChainId)
.build();
// 5. 发送到规则引擎队列
ruleEngineProducerService.sendToRuleEngine(...);
}
}
}
```
**消息发送**: 传输层通过 `sendToRuleEngine()` 方法将消息发送到规则引擎队列。
#### 3.1.2 核心服务消费
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
核心服务消费来自传输层的消息:
```java
// 处理来自传输层的消息
private void processToCoreMsg(ToCoreMsg toCoreMsg, TbCallback callback) {
if (toCoreMsg.hasToDeviceActorMsg()) {
// 转发到设备 Actor
TransportToDeviceActorMsg msg = toCoreMsg.getToDeviceActorMsg();
actorContext.tell(msg);
}
// ... 其他消息类型处理
}
```
#### 3.1.3 Actor 系统处理
**位置**: `application/src/main/java/org/thingsboard/server/actors/`
Actor 层次结构:
1. **AppActor**: 系统级 Actor
- 管理所有租户 Actor
- 处理系统级消息
2. **TenantActor**: 租户级 Actor
- 管理租户下的所有设备 Actor
- 处理租户级消息
3. **DeviceActor**: 设备级 Actor
- 处理单个设备的消息
- 管理设备状态和会话
**消息路由**:
```java
// AppActor 路由到 TenantActor
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
tenantActor -> tenantActor.tell(msg),
() -> msg.getCallback().onSuccess()
);
}
// TenantActor 路由到 DeviceActor
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
deviceActor.tell(msg);
}
```
#### 3.1.4 规则引擎处理
**位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`
```java
// 处理发送到规则引擎的消息
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
TbMsg tbMsg = msg.getMsg();
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
getRootChainActor().tell(msg);
} else {
// 使用指定的规则链
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
}
}
```
规则引擎处理消息后,可以通过规则节点执行各种操作:
- 保存遥测数据到数据库
- 触发告警
- 发送通知
- 调用外部服务
### 3.2 设备到核心服务的通信
#### 3.2.1 消息类型
传输层向核心服务发送的消息类型ToCoreMsg
- `TransportToDeviceActorMsg`: 设备 Actor 消息
- `DeviceStateServiceMsgProto`: 设备状态服务消息
- `DeviceConnectProto`: 设备连接消息
- `DeviceDisconnectProto`: 设备断开消息
- `DeviceInactivityProto`: 设备不活动消息
#### 3.2.2 发送流程
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
```java
// 发送消息到核心服务
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, ...) {
// 1. 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
// 2. 发送消息
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback);
}
```
### 3.3 核心服务到传输层的通信
#### 3.3.1 消息类型
核心服务向传输层发送的消息类型ToTransportMsg
- `SessionCloseNotificationProto`: 会话关闭通知
- `GetAttributeResponseMsg`: 获取属性响应
- `AttributeUpdateNotificationMsg`: 属性更新通知
- `ToDeviceRpcRequestMsg`: 设备 RPC 请求
- `ToServerRpcResponseMsg`: 服务器 RPC 响应
- `EntityUpdateMsg`: 实体更新消息
- `EntityDeleteMsg`: 实体删除消息
#### 3.3.2 发送流程
**位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java`
```java
// 核心服务发送消息到传输层
public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
// 1. 获取传输服务节点主题
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
// 2. 发送消息
TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg);
tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
}
```
### 3.4 规则引擎消息流
#### 3.4.1 消息发送到规则引擎
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`
```java
// 推送消息到规则引擎
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
// 1. 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
// 2. 构建 ToRuleEngineMsg
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsgProto(TbMsgProto.newBuilder()...)
.build();
// 3. 发送消息
producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(...), callback);
}
```
#### 3.4.2 规则引擎消费消息
规则引擎服务消费来自队列的消息,并通过 Actor 系统路由到相应的规则链进行处理。
## 4. 服务发现与负载均衡
### 4.1 服务发现
ThingsBoard 使用 ZooKeeper 进行服务发现:
- **服务注册**: 每个服务启动时在 ZooKeeper 注册
- **服务发现**: 通过 ZooKeeper 发现其他服务实例
- **健康检查**: 定期检查服务健康状态
### 4.2 分区服务
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java`
分区服务负责:
1. **分区解析**: 根据租户ID和实体ID解析目标分区
2. **负载均衡**: 在多个服务实例间分配负载
3. **分区管理**: 管理分区变更事件
```java
// 解析目标分区
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
// 根据租户ID和实体ID计算分区
// 返回对应的 TopicPartitionInfo
}
```
### 4.3 主题服务
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java`
主题服务负责:
1. **主题管理**: 管理消息队列主题
2. **通知主题**: 为每个服务实例创建通知主题
```java
// 获取服务通知主题
TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) {
// 返回服务实例的通知主题
}
```
## 5. 消息序列化
### 5.1 Protobuf 协议
ThingsBoard 使用 Protobuf 进行消息序列化,定义文件位于:
- `common/proto/src/main/proto/queue.proto`: 队列消息定义
- `common/proto/src/main/proto/transport.proto`: 传输协议定义
### 5.2 消息包装
所有队列消息都包装在 `TbProtoQueueMsg` 中:
```java
public class TbProtoQueueMsg<T extends GeneratedMessageV3> implements TbQueueMsg {
private final UUID key; // 路由键
private final T value; // Protobuf 消息
}
```
## 6. 异步回调机制
### 6.1 回调接口
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java`
```java
public interface TbQueueCallback {
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}
```
### 6.2 回调使用
消息发送时可以提供回调:
```java
// 发送消息并注册回调
producer.send(tpi, msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
// 处理成功
}
@Override
public void onFailure(Throwable t) {
// 处理失败
}
});
```
## 7. 错误处理与重试
### 7.1 消息确认
- 消息成功处理后发送确认
- 消息处理失败时根据配置进行重试
### 7.2 死信队列
处理失败的消息可以发送到死信队列进行后续处理。
## 8. 性能优化
### 8.1 批量处理
消息可以批量处理以提高吞吐量:
```java
// 批量处理消息
void process(List<TbProtoQueueMsg<ToCoreMsg>> msgs, TbCallback callback) {
// 批量处理逻辑
}
```
### 8.2 分区策略
- 按租户ID分区确保同一租户的消息在同一分区
- 按实体ID分区确保同一实体的消息在同一分区
## 9. 总结
ThingsBoard 的模块交互机制具有以下特点:
1. **异步通信**: 基于消息队列的异步通信,提高系统吞吐量
2. **解耦设计**: 模块间通过消息队列解耦,易于扩展和维护
3. **高可用性**: 支持多实例部署,通过分区服务实现负载均衡
4. **协议统一**: 使用 Protobuf 进行消息序列化,提高性能
5. **灵活扩展**: 支持多种消息队列实现,可根据需求选择
这种设计使得 ThingsBoard 能够处理大规模的物联网设备连接和数据流,同时保持系统的稳定性和可扩展性。