thingsboard/summary/02-模块交互与通信机制.md

387 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ThingsBoard 模块交互与通信机制分析
## 1. 概述
ThingsBoard 采用基于消息队列的异步通信机制,实现模块间的解耦和高可用性。核心通信协议使用 Protobuf 进行消息序列化,通过 Kafka、RabbitMQ 等消息队列系统进行消息传递。
## 2. 通信架构
### 2.1 消息队列架构
ThingsBoard 使用消息队列作为模块间通信的中间件,支持以下队列系统:
- **Kafka**: 高吞吐量分布式消息队列(推荐生产环境)
- **RabbitMQ**: 企业级消息队列
- **AWS SQS**: Amazon 云消息队列
- **Google Pub/Sub**: Google 云消息队列
- **Azure Service Bus**: Azure 云消息队列
- **In-Memory**: 内存队列(仅用于测试)
### 2.2 消息类型
根据消息的目标服务ThingsBoard 定义了以下主要消息类型:
1. **ToCoreMsg**: 发送到核心服务的消息
2. **ToCoreNotificationMsg**: 发送到核心服务的高优先级通知消息
3. **ToRuleEngineMsg**: 发送到规则引擎的消息
4. **ToRuleEngineNotificationMsg**: 发送到规则引擎的通知消息
5. **ToTransportMsg**: 发送到传输层的消息
6. **ToTransportNotificationMsg**: 发送到传输层的通知消息
### 2.3 服务类型
ThingsBoard 定义了以下服务类型ServiceType
- **TB_CORE**: 核心服务
- **TB_RULE_ENGINE**: 规则引擎服务
- **TB_TRANSPORT**: 传输服务
- **TB_EDQS**: 事件驱动查询服务
## 3. 数据流分析
### 3.1 设备遥测数据流
设备遥测数据的完整流程如下:
```
设备 -> 传输层 -> 消息队列 -> 核心服务 -> 规则引擎 -> 数据存储
-> WebSocket -> 前端UI
```
#### 3.1.1 传输层处理
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
**关键方法**: `process(PostTelemetryMsg)`
```java
// 处理设备遥测数据
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) {
// 1. 验证数据点限制
if (checkLimits(sessionInfo, msg, callback, dataPoints)) {
// 2. 记录设备活动
recordActivityInternal(sessionInfo);
// 3. 提取租户ID和设备ID
TenantId tenantId = getTenantId(sessionInfo);
DeviceId deviceId = new DeviceId(...);
// 4. 构建消息并发送到规则引擎
for (TsKvListProto tsKv : msg.getTsKvListList()) {
TbMsg tbMsg = TbMsg.newMsg()
.queueName(queueName)
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.originator(deviceId)
.data(json)
.ruleChainId(ruleChainId)
.build();
// 5. 发送到规则引擎队列
ruleEngineProducerService.sendToRuleEngine(...);
}
}
}
```
**消息发送**: 传输层通过 `sendToRuleEngine()` 方法将消息发送到规则引擎队列。
#### 3.1.2 核心服务消费
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
核心服务消费来自传输层的消息:
```java
// 处理来自传输层的消息
private void processToCoreMsg(ToCoreMsg toCoreMsg, TbCallback callback) {
if (toCoreMsg.hasToDeviceActorMsg()) {
// 转发到设备 Actor
TransportToDeviceActorMsg msg = toCoreMsg.getToDeviceActorMsg();
actorContext.tell(msg);
}
// ... 其他消息类型处理
}
```
#### 3.1.3 Actor 系统处理
**位置**: `application/src/main/java/org/thingsboard/server/actors/`
Actor 层次结构:
1. **AppActor**: 系统级 Actor
- 管理所有租户 Actor
- 处理系统级消息
2. **TenantActor**: 租户级 Actor
- 管理租户下的所有设备 Actor
- 处理租户级消息
3. **DeviceActor**: 设备级 Actor
- 处理单个设备的消息
- 管理设备状态和会话
**消息路由**:
```java
// AppActor 路由到 TenantActor
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
tenantActor -> tenantActor.tell(msg),
() -> msg.getCallback().onSuccess()
);
}
// TenantActor 路由到 DeviceActor
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
deviceActor.tell(msg);
}
```
#### 3.1.4 规则引擎处理
**位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`
```java
// 处理发送到规则引擎的消息
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
TbMsg tbMsg = msg.getMsg();
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
getRootChainActor().tell(msg);
} else {
// 使用指定的规则链
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
}
}
```
规则引擎处理消息后,可以通过规则节点执行各种操作:
- 保存遥测数据到数据库
- 触发告警
- 发送通知
- 调用外部服务
### 3.2 设备到核心服务的通信
#### 3.2.1 消息类型
传输层向核心服务发送的消息类型ToCoreMsg
- `TransportToDeviceActorMsg`: 设备 Actor 消息
- `DeviceStateServiceMsgProto`: 设备状态服务消息
- `DeviceConnectProto`: 设备连接消息
- `DeviceDisconnectProto`: 设备断开消息
- `DeviceInactivityProto`: 设备不活动消息
#### 3.2.2 发送流程
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
```java
// 发送消息到核心服务
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, ...) {
// 1. 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
// 2. 发送消息
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback);
}
```
### 3.3 核心服务到传输层的通信
#### 3.3.1 消息类型
核心服务向传输层发送的消息类型ToTransportMsg
- `SessionCloseNotificationProto`: 会话关闭通知
- `GetAttributeResponseMsg`: 获取属性响应
- `AttributeUpdateNotificationMsg`: 属性更新通知
- `ToDeviceRpcRequestMsg`: 设备 RPC 请求
- `ToServerRpcResponseMsg`: 服务器 RPC 响应
- `EntityUpdateMsg`: 实体更新消息
- `EntityDeleteMsg`: 实体删除消息
#### 3.3.2 发送流程
**位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java`
```java
// 核心服务发送消息到传输层
public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
// 1. 获取传输服务节点主题
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
// 2. 发送消息
TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg);
tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure));
}
```
### 3.4 规则引擎消息流
#### 3.4.1 消息发送到规则引擎
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`
```java
// 推送消息到规则引擎
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
// 1. 解析目标分区
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
// 2. 构建 ToRuleEngineMsg
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsgProto(TbMsgProto.newBuilder()...)
.build();
// 3. 发送消息
producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(...), callback);
}
```
#### 3.4.2 规则引擎消费消息
规则引擎服务消费来自队列的消息,并通过 Actor 系统路由到相应的规则链进行处理。
## 4. 服务发现与负载均衡
### 4.1 服务发现
ThingsBoard 使用 ZooKeeper 进行服务发现:
- **服务注册**: 每个服务启动时在 ZooKeeper 注册
- **服务发现**: 通过 ZooKeeper 发现其他服务实例
- **健康检查**: 定期检查服务健康状态
### 4.2 分区服务
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java`
分区服务负责:
1. **分区解析**: 根据租户ID和实体ID解析目标分区
2. **负载均衡**: 在多个服务实例间分配负载
3. **分区管理**: 管理分区变更事件
```java
// 解析目标分区
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
// 根据租户ID和实体ID计算分区
// 返回对应的 TopicPartitionInfo
}
```
### 4.3 主题服务
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java`
主题服务负责:
1. **主题管理**: 管理消息队列主题
2. **通知主题**: 为每个服务实例创建通知主题
```java
// 获取服务通知主题
TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) {
// 返回服务实例的通知主题
}
```
## 5. 消息序列化
### 5.1 Protobuf 协议
ThingsBoard 使用 Protobuf 进行消息序列化,定义文件位于:
- `common/proto/src/main/proto/queue.proto`: 队列消息定义
- `common/proto/src/main/proto/transport.proto`: 传输协议定义
### 5.2 消息包装
所有队列消息都包装在 `TbProtoQueueMsg` 中:
```java
public class TbProtoQueueMsg<T extends GeneratedMessageV3> implements TbQueueMsg {
private final UUID key; // 路由键
private final T value; // Protobuf 消息
}
```
## 6. 异步回调机制
### 6.1 回调接口
**位置**: `common/queue/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java`
```java
public interface TbQueueCallback {
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}
```
### 6.2 回调使用
消息发送时可以提供回调:
```java
// 发送消息并注册回调
producer.send(tpi, msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
// 处理成功
}
@Override
public void onFailure(Throwable t) {
// 处理失败
}
});
```
## 7. 错误处理与重试
### 7.1 消息确认
- 消息成功处理后发送确认
- 消息处理失败时根据配置进行重试
### 7.2 死信队列
处理失败的消息可以发送到死信队列进行后续处理。
## 8. 性能优化
### 8.1 批量处理
消息可以批量处理以提高吞吐量:
```java
// 批量处理消息
void process(List<TbProtoQueueMsg<ToCoreMsg>> msgs, TbCallback callback) {
// 批量处理逻辑
}
```
### 8.2 分区策略
- 按租户ID分区确保同一租户的消息在同一分区
- 按实体ID分区确保同一实体的消息在同一分区
## 9. 总结
ThingsBoard 的模块交互机制具有以下特点:
1. **异步通信**: 基于消息队列的异步通信,提高系统吞吐量
2. **解耦设计**: 模块间通过消息队列解耦,易于扩展和维护
3. **高可用性**: 支持多实例部署,通过分区服务实现负载均衡
4. **协议统一**: 使用 Protobuf 进行消息序列化,提高性能
5. **灵活扩展**: 支持多种消息队列实现,可根据需求选择
这种设计使得 ThingsBoard 能够处理大规模的物联网设备连接和数据流,同时保持系统的稳定性和可扩展性。