326 lines
9.8 KiB
Markdown
326 lines
9.8 KiB
Markdown
|
|
# ThingsBoard Actor 系统源码分析
|
|||
|
|
|
|||
|
|
## 1. 概述
|
|||
|
|
|
|||
|
|
ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。
|
|||
|
|
|
|||
|
|
## 2. Actor 层次结构
|
|||
|
|
|
|||
|
|
### 2.1 Actor 层次
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
AppActor (系统级)
|
|||
|
|
└── TenantActor (租户级)
|
|||
|
|
├── DeviceActor (设备级)
|
|||
|
|
└── RuleChainActor (规则链级)
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2.2 Actor 类型
|
|||
|
|
|
|||
|
|
#### 2.2.1 AppActor
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/app/AppActor.java`
|
|||
|
|
|
|||
|
|
**作用**: 系统级 Actor,管理所有租户 Actor。
|
|||
|
|
|
|||
|
|
**关键方法**:
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
/**
|
|||
|
|
* 初始化租户 Actor
|
|||
|
|
* 系统启动时,为所有已存在的租户创建 TenantActor
|
|||
|
|
*/
|
|||
|
|
private void initTenantActors() {
|
|||
|
|
if (systemContext.isTenantComponentsInitEnabled()) {
|
|||
|
|
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(
|
|||
|
|
tenantService::findTenants, ENTITY_PACK_LIMIT);
|
|||
|
|
for (Tenant tenant : tenantIterator) {
|
|||
|
|
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(
|
|||
|
|
tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()),
|
|||
|
|
() -> log.debug("[{}] Skipped actor creation", tenant.getId())
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 获取或创建租户 Actor
|
|||
|
|
*/
|
|||
|
|
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
|
|||
|
|
if (deletedTenants.contains(tenantId)) {
|
|||
|
|
return Optional.empty();
|
|||
|
|
}
|
|||
|
|
return Optional.ofNullable(ctx.getOrCreateChildActor(
|
|||
|
|
new TbEntityActorId(tenantId),
|
|||
|
|
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
|
|||
|
|
() -> new TenantActor.ActorCreator(systemContext, tenantId),
|
|||
|
|
() -> true));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 处理发送到设备 Actor 的消息
|
|||
|
|
* 路由到对应的租户 Actor
|
|||
|
|
*/
|
|||
|
|
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
|
|||
|
|
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
|
|||
|
|
tenantActor -> {
|
|||
|
|
if (priority) {
|
|||
|
|
tenantActor.tellWithHighPriority(msg);
|
|||
|
|
} else {
|
|||
|
|
tenantActor.tell(msg);
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
() -> {
|
|||
|
|
// 如果租户不存在,直接成功回调
|
|||
|
|
if (msg instanceof TransportToDeviceActorMsgWrapper) {
|
|||
|
|
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**处理的消息类型**:
|
|||
|
|
- `APP_INIT_MSG`: 应用初始化消息
|
|||
|
|
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层到设备 Actor 的消息
|
|||
|
|
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
|
|||
|
|
- `PARTITION_CHANGE_MSG`: 分区变更消息
|
|||
|
|
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
|
|||
|
|
|
|||
|
|
#### 2.2.2 TenantActor
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`
|
|||
|
|
|
|||
|
|
**作用**: 租户级 Actor,管理租户下的所有设备 Actor 和规则链 Actor。
|
|||
|
|
|
|||
|
|
**关键方法**:
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
/**
|
|||
|
|
* 处理发送到设备 Actor 的消息
|
|||
|
|
* 路由到对应的设备 Actor
|
|||
|
|
*/
|
|||
|
|
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
|
|||
|
|
if (!isCore) {
|
|||
|
|
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
|
|||
|
|
}
|
|||
|
|
if (deletedDevices.contains(msg.getDeviceId())) {
|
|||
|
|
log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
|
|||
|
|
if (priority) {
|
|||
|
|
deviceActor.tellWithHighPriority(msg);
|
|||
|
|
} else {
|
|||
|
|
deviceActor.tell(msg);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* 处理发送到规则引擎的消息
|
|||
|
|
*/
|
|||
|
|
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
|
|||
|
|
if (!isRuleEngine) {
|
|||
|
|
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
TbMsg tbMsg = msg.getMsg();
|
|||
|
|
if (getApiUsageState().isReExecEnabled()) {
|
|||
|
|
if (tbMsg.getRuleChainId() == null) {
|
|||
|
|
// 使用根规则链
|
|||
|
|
if (getRootChainActor() != null) {
|
|||
|
|
getRootChainActor().tell(msg);
|
|||
|
|
} else {
|
|||
|
|
tbMsg.getCallback().onFailure(
|
|||
|
|
new RuleEngineException("No Root Rule Chain available!"));
|
|||
|
|
}
|
|||
|
|
} else {
|
|||
|
|
// 使用指定的规则链
|
|||
|
|
try {
|
|||
|
|
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
|
|||
|
|
} catch (TbActorNotRegisteredException ex) {
|
|||
|
|
log.trace("Received message for non-existing rule chain: [{}]",
|
|||
|
|
tbMsg.getRuleChainId());
|
|||
|
|
tbMsg.getCallback().onSuccess();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
} else {
|
|||
|
|
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
|
|||
|
|
tbMsg.getCallback().onSuccess();
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 2.2.3 DeviceActor
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`
|
|||
|
|
|
|||
|
|
**作用**: 设备级 Actor,处理单个设备的所有消息。
|
|||
|
|
|
|||
|
|
**关键方法**:
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
/**
|
|||
|
|
* 处理消息
|
|||
|
|
*/
|
|||
|
|
@Override
|
|||
|
|
protected boolean doProcess(TbActorMsg msg) {
|
|||
|
|
switch (msg.getMsgType()) {
|
|||
|
|
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.process((TransportToDeviceActorMsgWrapper) msg);
|
|||
|
|
break;
|
|||
|
|
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg);
|
|||
|
|
break;
|
|||
|
|
case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
ctx.stop(ctx.getSelf());
|
|||
|
|
break;
|
|||
|
|
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.processCredentialsUpdate(msg);
|
|||
|
|
break;
|
|||
|
|
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
|
|||
|
|
break;
|
|||
|
|
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
|
|||
|
|
break;
|
|||
|
|
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
|
|||
|
|
processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg);
|
|||
|
|
break;
|
|||
|
|
case SESSION_TIMEOUT_MSG:
|
|||
|
|
processor.checkSessionsTimeout();
|
|||
|
|
break;
|
|||
|
|
// ... 其他消息类型
|
|||
|
|
default:
|
|||
|
|
return false;
|
|||
|
|
}
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**实际处理逻辑**: DeviceActor 使用 `DeviceActorMessageProcessor` 来处理具体的业务逻辑。
|
|||
|
|
|
|||
|
|
## 3. Actor 消息处理流程
|
|||
|
|
|
|||
|
|
### 3.1 消息路由流程
|
|||
|
|
|
|||
|
|
1. **传输层消息**:
|
|||
|
|
```
|
|||
|
|
传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
2. **规则引擎消息**:
|
|||
|
|
```
|
|||
|
|
核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 3.2 消息优先级
|
|||
|
|
|
|||
|
|
Actor 系统支持两种消息优先级:
|
|||
|
|
|
|||
|
|
- **普通消息**: `tell(msg)` - 正常优先级
|
|||
|
|
- **高优先级消息**: `tellWithHighPriority(msg)` - 高优先级,优先处理
|
|||
|
|
|
|||
|
|
高优先级消息用于:
|
|||
|
|
- 设备凭证更新
|
|||
|
|
- 设备名称/类型更新
|
|||
|
|
- RPC 请求/响应
|
|||
|
|
- 设备删除
|
|||
|
|
|
|||
|
|
## 4. Actor 生命周期管理
|
|||
|
|
|
|||
|
|
### 4.1 Actor 创建
|
|||
|
|
|
|||
|
|
Actor 采用懒加载策略:
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
// 获取或创建 Actor
|
|||
|
|
TbActorRef getOrCreateChildActor(TbActorId actorId,
|
|||
|
|
Supplier<String> dispatcherName,
|
|||
|
|
Supplier<TbActor> actorCreator,
|
|||
|
|
Supplier<Boolean> shouldCreate);
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.2 Actor 停止
|
|||
|
|
|
|||
|
|
- **设备删除**: `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG` 消息会停止对应的 DeviceActor
|
|||
|
|
- **租户删除**: `ComponentLifecycleEvent.DELETED` 事件会停止对应的 TenantActor
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
// 租户删除处理
|
|||
|
|
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
|
|||
|
|
log.info("[{}] Handling tenant deleted notification: {}",
|
|||
|
|
msg.getTenantId(), msg);
|
|||
|
|
deletedTenants.add(tenantId);
|
|||
|
|
ctx.stop(new TbEntityActorId(tenantId));
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 5. Actor 消息类型
|
|||
|
|
|
|||
|
|
### 5.1 传输层消息
|
|||
|
|
|
|||
|
|
- `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层发送到设备 Actor 的消息
|
|||
|
|
- 包含遥测数据、属性更新等
|
|||
|
|
|
|||
|
|
### 5.2 设备管理消息
|
|||
|
|
|
|||
|
|
- `DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备属性更新
|
|||
|
|
- `DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备凭证更新
|
|||
|
|
- `DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备名称/类型更新
|
|||
|
|
- `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG`: 设备删除
|
|||
|
|
- `DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG`: Edge 设备更新
|
|||
|
|
|
|||
|
|
### 5.3 RPC 消息
|
|||
|
|
|
|||
|
|
- `DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG`: 发送到设备的 RPC 请求
|
|||
|
|
- `DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG`: 设备 RPC 响应
|
|||
|
|
- `DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG`: 服务器端 RPC 超时
|
|||
|
|
|
|||
|
|
### 5.4 规则引擎消息
|
|||
|
|
|
|||
|
|
- `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息
|
|||
|
|
- `RULE_CHAIN_INPUT_MSG`: 规则链输入消息
|
|||
|
|
- `RULE_CHAIN_OUTPUT_MSG`: 规则链输出消息
|
|||
|
|
|
|||
|
|
### 5.5 系统消息
|
|||
|
|
|
|||
|
|
- `APP_INIT_MSG`: 应用初始化消息
|
|||
|
|
- `PARTITION_CHANGE_MSG`: 分区变更消息
|
|||
|
|
- `SESSION_TIMEOUT_MSG`: 会话超时消息
|
|||
|
|
- `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息
|
|||
|
|
|
|||
|
|
## 6. Actor 系统配置
|
|||
|
|
|
|||
|
|
### 6.1 Actor 系统上下文
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java`
|
|||
|
|
|
|||
|
|
Actor 系统上下文包含:
|
|||
|
|
- 各种服务引用(DAO、Service等)
|
|||
|
|
- 配置信息
|
|||
|
|
- 统计信息
|
|||
|
|
|
|||
|
|
### 6.2 Actor 服务
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java`
|
|||
|
|
|
|||
|
|
Actor 服务负责:
|
|||
|
|
- Actor 系统的初始化和关闭
|
|||
|
|
- Actor 的创建和管理
|
|||
|
|
- 消息调度
|
|||
|
|
|
|||
|
|
## 7. 总结
|
|||
|
|
|
|||
|
|
ThingsBoard 的 Actor 系统具有以下特点:
|
|||
|
|
|
|||
|
|
1. **层次化设计**: AppActor -> TenantActor -> DeviceActor,清晰的层次结构
|
|||
|
|
2. **懒加载**: Actor 按需创建,节省资源
|
|||
|
|
3. **消息驱动**: 所有操作通过消息传递,实现异步处理
|
|||
|
|
4. **优先级支持**: 支持普通和高优先级消息
|
|||
|
|
5. **生命周期管理**: 完善的 Actor 创建和销毁机制
|
|||
|
|
|
|||
|
|
Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。
|
|||
|
|
|