thingsboard/summary/03-核心模块源码分析/01-Actor系统分析.md

326 lines
9.8 KiB
Markdown
Raw Permalink Normal View History

2026-01-19 11:50:37 +08:00
# ThingsBoard Actor 系统源码分析
## 1. 概述
ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。
## 2. Actor 层次结构
### 2.1 Actor 层次
```
AppActor (系统级)
└── TenantActor (租户级)
├── DeviceActor (设备级)
└── RuleChainActor (规则链级)
```
### 2.2 Actor 类型
#### 2.2.1 AppActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/app/AppActor.java`
**作用**: 系统级 Actor管理所有租户 Actor。
**关键方法**:
```java
/**
* 初始化租户 Actor
* 系统启动时,为所有已存在的租户创建 TenantActor
*/
private void initTenantActors() {
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(
tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(
tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()),
() -> log.debug("[{}] Skipped actor creation", tenant.getId())
);
}
}
}
/**
* 获取或创建租户 Actor
*/
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
if (deletedTenants.contains(tenantId)) {
return Optional.empty();
}
return Optional.ofNullable(ctx.getOrCreateChildActor(
new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId),
() -> true));
}
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的租户 Actor
*/
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
},
() -> {
// 如果租户不存在,直接成功回调
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
);
}
```
**处理的消息类型**:
- `APP_INIT_MSG`: 应用初始化消息
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层到设备 Actor 的消息
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
- `PARTITION_CHANGE_MSG`: 分区变更消息
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
#### 2.2.2 TenantActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`
**作用**: 租户级 Actor管理租户下的所有设备 Actor 和规则链 Actor。
**关键方法**:
```java
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的设备 Actor
*/
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
if (!isCore) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
if (deletedDevices.contains(msg.getDeviceId())) {
log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
return;
}
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
if (priority) {
deviceActor.tellWithHighPriority(msg);
} else {
deviceActor.tell(msg);
}
}
/**
* 处理发送到规则引擎的消息
*/
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngine) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getMsg();
if (getApiUsageState().isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(
new RuleEngineException("No Root Rule Chain available!"));
}
} else {
// 使用指定的规则链
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]",
tbMsg.getRuleChainId());
tbMsg.getCallback().onSuccess();
}
}
} else {
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
tbMsg.getCallback().onSuccess();
}
}
```
#### 2.2.3 DeviceActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`
**作用**: 设备级 Actor处理单个设备的所有消息。
**关键方法**:
```java
/**
* 处理消息
*/
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
processor.process((TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg);
break;
case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG:
ctx.stop(ctx.getSelf());
break;
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processCredentialsUpdate(msg);
break;
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
break;
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
processor.checkSessionsTimeout();
break;
// ... 其他消息类型
default:
return false;
}
return true;
}
```
**实际处理逻辑**: DeviceActor 使用 `DeviceActorMessageProcessor` 来处理具体的业务逻辑。
## 3. Actor 消息处理流程
### 3.1 消息路由流程
1. **传输层消息**:
```
传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor
```
2. **规则引擎消息**:
```
核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor
```
### 3.2 消息优先级
Actor 系统支持两种消息优先级:
- **普通消息**: `tell(msg)` - 正常优先级
- **高优先级消息**: `tellWithHighPriority(msg)` - 高优先级,优先处理
高优先级消息用于:
- 设备凭证更新
- 设备名称/类型更新
- RPC 请求/响应
- 设备删除
## 4. Actor 生命周期管理
### 4.1 Actor 创建
Actor 采用懒加载策略:
```java
// 获取或创建 Actor
TbActorRef getOrCreateChildActor(TbActorId actorId,
Supplier<String> dispatcherName,
Supplier<TbActor> actorCreator,
Supplier<Boolean> shouldCreate);
```
### 4.2 Actor 停止
- **设备删除**: `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG` 消息会停止对应的 DeviceActor
- **租户删除**: `ComponentLifecycleEvent.DELETED` 事件会停止对应的 TenantActor
```java
// 租户删除处理
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}",
msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
return;
}
```
## 5. Actor 消息类型
### 5.1 传输层消息
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层发送到设备 Actor 的消息
- 包含遥测数据、属性更新等
### 5.2 设备管理消息
- `DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备属性更新
- `DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备凭证更新
- `DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备名称/类型更新
- `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG`: 设备删除
- `DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG`: Edge 设备更新
### 5.3 RPC 消息
- `DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG`: 发送到设备的 RPC 请求
- `DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG`: 设备 RPC 响应
- `DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG`: 服务器端 RPC 超时
### 5.4 规则引擎消息
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
- `RULE_CHAIN_INPUT_MSG`: 规则链输入消息
- `RULE_CHAIN_OUTPUT_MSG`: 规则链输出消息
### 5.5 系统消息
- `APP_INIT_MSG`: 应用初始化消息
- `PARTITION_CHANGE_MSG`: 分区变更消息
- `SESSION_TIMEOUT_MSG`: 会话超时消息
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
## 6. Actor 系统配置
### 6.1 Actor 系统上下文
**位置**: `application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java`
Actor 系统上下文包含:
- 各种服务引用DAO、Service等
- 配置信息
- 统计信息
### 6.2 Actor 服务
**位置**: `application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java`
Actor 服务负责:
- Actor 系统的初始化和关闭
- Actor 的创建和管理
- 消息调度
## 7. 总结
ThingsBoard 的 Actor 系统具有以下特点:
1. **层次化设计**: AppActor -> TenantActor -> DeviceActor清晰的层次结构
2. **懒加载**: Actor 按需创建,节省资源
3. **消息驱动**: 所有操作通过消息传递,实现异步处理
4. **优先级支持**: 支持普通和高优先级消息
5. **生命周期管理**: 完善的 Actor 创建和销毁机制
Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。