thingsboard/summary/03-核心模块源码分析/01-Actor系统分析.md

326 lines
9.8 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ThingsBoard Actor 系统源码分析
## 1. 概述
ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。
## 2. Actor 层次结构
### 2.1 Actor 层次
```
AppActor (系统级)
└── TenantActor (租户级)
├── DeviceActor (设备级)
└── RuleChainActor (规则链级)
```
### 2.2 Actor 类型
#### 2.2.1 AppActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/app/AppActor.java`
**作用**: 系统级 Actor管理所有租户 Actor。
**关键方法**:
```java
/**
* 初始化租户 Actor
* 系统启动时,为所有已存在的租户创建 TenantActor
*/
private void initTenantActors() {
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(
tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(
tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()),
() -> log.debug("[{}] Skipped actor creation", tenant.getId())
);
}
}
}
/**
* 获取或创建租户 Actor
*/
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
if (deletedTenants.contains(tenantId)) {
return Optional.empty();
}
return Optional.ofNullable(ctx.getOrCreateChildActor(
new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId),
() -> true));
}
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的租户 Actor
*/
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
},
() -> {
// 如果租户不存在,直接成功回调
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
);
}
```
**处理的消息类型**:
- `APP_INIT_MSG`: 应用初始化消息
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层到设备 Actor 的消息
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
- `PARTITION_CHANGE_MSG`: 分区变更消息
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
#### 2.2.2 TenantActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`
**作用**: 租户级 Actor管理租户下的所有设备 Actor 和规则链 Actor。
**关键方法**:
```java
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的设备 Actor
*/
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
if (!isCore) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
if (deletedDevices.contains(msg.getDeviceId())) {
log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
return;
}
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
if (priority) {
deviceActor.tellWithHighPriority(msg);
} else {
deviceActor.tell(msg);
}
}
/**
* 处理发送到规则引擎的消息
*/
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngine) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getMsg();
if (getApiUsageState().isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(
new RuleEngineException("No Root Rule Chain available!"));
}
} else {
// 使用指定的规则链
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]",
tbMsg.getRuleChainId());
tbMsg.getCallback().onSuccess();
}
}
} else {
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
tbMsg.getCallback().onSuccess();
}
}
```
#### 2.2.3 DeviceActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`
**作用**: 设备级 Actor处理单个设备的所有消息。
**关键方法**:
```java
/**
* 处理消息
*/
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
processor.process((TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg);
break;
case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG:
ctx.stop(ctx.getSelf());
break;
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processCredentialsUpdate(msg);
break;
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
break;
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
processor.checkSessionsTimeout();
break;
// ... 其他消息类型
default:
return false;
}
return true;
}
```
**实际处理逻辑**: DeviceActor 使用 `DeviceActorMessageProcessor` 来处理具体的业务逻辑。
## 3. Actor 消息处理流程
### 3.1 消息路由流程
1. **传输层消息**:
```
传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor
```
2. **规则引擎消息**:
```
核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor
```
### 3.2 消息优先级
Actor 系统支持两种消息优先级:
- **普通消息**: `tell(msg)` - 正常优先级
- **高优先级消息**: `tellWithHighPriority(msg)` - 高优先级,优先处理
高优先级消息用于:
- 设备凭证更新
- 设备名称/类型更新
- RPC 请求/响应
- 设备删除
## 4. Actor 生命周期管理
### 4.1 Actor 创建
Actor 采用懒加载策略:
```java
// 获取或创建 Actor
TbActorRef getOrCreateChildActor(TbActorId actorId,
Supplier<String> dispatcherName,
Supplier<TbActor> actorCreator,
Supplier<Boolean> shouldCreate);
```
### 4.2 Actor 停止
- **设备删除**: `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG` 消息会停止对应的 DeviceActor
- **租户删除**: `ComponentLifecycleEvent.DELETED` 事件会停止对应的 TenantActor
```java
// 租户删除处理
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}",
msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
return;
}
```
## 5. Actor 消息类型
### 5.1 传输层消息
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层发送到设备 Actor 的消息
- 包含遥测数据、属性更新等
### 5.2 设备管理消息
- `DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备属性更新
- `DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备凭证更新
- `DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备名称/类型更新
- `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG`: 设备删除
- `DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG`: Edge 设备更新
### 5.3 RPC 消息
- `DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG`: 发送到设备的 RPC 请求
- `DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG`: 设备 RPC 响应
- `DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG`: 服务器端 RPC 超时
### 5.4 规则引擎消息
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
- `RULE_CHAIN_INPUT_MSG`: 规则链输入消息
- `RULE_CHAIN_OUTPUT_MSG`: 规则链输出消息
### 5.5 系统消息
- `APP_INIT_MSG`: 应用初始化消息
- `PARTITION_CHANGE_MSG`: 分区变更消息
- `SESSION_TIMEOUT_MSG`: 会话超时消息
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
## 6. Actor 系统配置
### 6.1 Actor 系统上下文
**位置**: `application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java`
Actor 系统上下文包含:
- 各种服务引用DAO、Service等
- 配置信息
- 统计信息
### 6.2 Actor 服务
**位置**: `application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java`
Actor 服务负责:
- Actor 系统的初始化和关闭
- Actor 的创建和管理
- 消息调度
## 7. 总结
ThingsBoard 的 Actor 系统具有以下特点:
1. **层次化设计**: AppActor -> TenantActor -> DeviceActor清晰的层次结构
2. **懒加载**: Actor 按需创建,节省资源
3. **消息驱动**: 所有操作通过消息传递,实现异步处理
4. **优先级支持**: 支持普通和高优先级消息
5. **生命周期管理**: 完善的 Actor 创建和销毁机制
Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。