9.8 KiB
ThingsBoard Actor 系统源码分析
1. 概述
ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。
2. Actor 层次结构
2.1 Actor 层次
AppActor (系统级)
└── TenantActor (租户级)
├── DeviceActor (设备级)
└── RuleChainActor (规则链级)
2.2 Actor 类型
2.2.1 AppActor
位置: application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
作用: 系统级 Actor,管理所有租户 Actor。
关键方法:
/**
* 初始化租户 Actor
* 系统启动时,为所有已存在的租户创建 TenantActor
*/
private void initTenantActors() {
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(
tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(
tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()),
() -> log.debug("[{}] Skipped actor creation", tenant.getId())
);
}
}
}
/**
* 获取或创建租户 Actor
*/
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
if (deletedTenants.contains(tenantId)) {
return Optional.empty();
}
return Optional.ofNullable(ctx.getOrCreateChildActor(
new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId),
() -> true));
}
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的租户 Actor
*/
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
},
() -> {
// 如果租户不存在,直接成功回调
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
);
}
处理的消息类型:
APP_INIT_MSG: 应用初始化消息TRANSPORT_TO_DEVICE_ACTOR_MSG: 传输层到设备 Actor 的消息QUEUE_TO_RULE_ENGINE_MSG: 发送到规则引擎的消息PARTITION_CHANGE_MSG: 分区变更消息COMPONENT_LIFE_CYCLE_MSG: 组件生命周期消息
2.2.2 TenantActor
位置: application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
作用: 租户级 Actor,管理租户下的所有设备 Actor 和规则链 Actor。
关键方法:
/**
* 处理发送到设备 Actor 的消息
* 路由到对应的设备 Actor
*/
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
if (!isCore) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
}
if (deletedDevices.contains(msg.getDeviceId())) {
log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
return;
}
TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
if (priority) {
deviceActor.tellWithHighPriority(msg);
} else {
deviceActor.tell(msg);
}
}
/**
* 处理发送到规则引擎的消息
*/
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngine) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getMsg();
if (getApiUsageState().isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(
new RuleEngineException("No Root Rule Chain available!"));
}
} else {
// 使用指定的规则链
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]",
tbMsg.getRuleChainId());
tbMsg.getCallback().onSuccess();
}
}
} else {
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
tbMsg.getCallback().onSuccess();
}
}
2.2.3 DeviceActor
位置: application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
作用: 设备级 Actor,处理单个设备的所有消息。
关键方法:
/**
* 处理消息
*/
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
processor.process((TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg);
break;
case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG:
ctx.stop(ctx.getSelf());
break;
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processCredentialsUpdate(msg);
break;
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
break;
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
processor.checkSessionsTimeout();
break;
// ... 其他消息类型
default:
return false;
}
return true;
}
实际处理逻辑: DeviceActor 使用 DeviceActorMessageProcessor 来处理具体的业务逻辑。
3. Actor 消息处理流程
3.1 消息路由流程
-
传输层消息:
传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor -
规则引擎消息:
核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor
3.2 消息优先级
Actor 系统支持两种消息优先级:
- 普通消息:
tell(msg)- 正常优先级 - 高优先级消息:
tellWithHighPriority(msg)- 高优先级,优先处理
高优先级消息用于:
- 设备凭证更新
- 设备名称/类型更新
- RPC 请求/响应
- 设备删除
4. Actor 生命周期管理
4.1 Actor 创建
Actor 采用懒加载策略:
// 获取或创建 Actor
TbActorRef getOrCreateChildActor(TbActorId actorId,
Supplier<String> dispatcherName,
Supplier<TbActor> actorCreator,
Supplier<Boolean> shouldCreate);
4.2 Actor 停止
- 设备删除:
DEVICE_DELETE_TO_DEVICE_ACTOR_MSG消息会停止对应的 DeviceActor - 租户删除:
ComponentLifecycleEvent.DELETED事件会停止对应的 TenantActor
// 租户删除处理
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}",
msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
return;
}
5. Actor 消息类型
5.1 传输层消息
TRANSPORT_TO_DEVICE_ACTOR_MSG: 传输层发送到设备 Actor 的消息- 包含遥测数据、属性更新等
5.2 设备管理消息
DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: 设备属性更新DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: 设备凭证更新DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 设备名称/类型更新DEVICE_DELETE_TO_DEVICE_ACTOR_MSG: 设备删除DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: Edge 设备更新
5.3 RPC 消息
DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 发送到设备的 RPC 请求DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 设备 RPC 响应DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG: 服务器端 RPC 超时
5.4 规则引擎消息
QUEUE_TO_RULE_ENGINE_MSG: 发送到规则引擎的消息RULE_CHAIN_INPUT_MSG: 规则链输入消息RULE_CHAIN_OUTPUT_MSG: 规则链输出消息
5.5 系统消息
APP_INIT_MSG: 应用初始化消息PARTITION_CHANGE_MSG: 分区变更消息SESSION_TIMEOUT_MSG: 会话超时消息COMPONENT_LIFE_CYCLE_MSG: 组件生命周期消息
6. Actor 系统配置
6.1 Actor 系统上下文
位置: application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
Actor 系统上下文包含:
- 各种服务引用(DAO、Service等)
- 配置信息
- 统计信息
6.2 Actor 服务
位置: application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
Actor 服务负责:
- Actor 系统的初始化和关闭
- Actor 的创建和管理
- 消息调度
7. 总结
ThingsBoard 的 Actor 系统具有以下特点:
- 层次化设计: AppActor -> TenantActor -> DeviceActor,清晰的层次结构
- 懒加载: Actor 按需创建,节省资源
- 消息驱动: 所有操作通过消息传递,实现异步处理
- 优先级支持: 支持普通和高优先级消息
- 生命周期管理: 完善的 Actor 创建和销毁机制
Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。