# ThingsBoard Actor 系统源码分析 ## 1. 概述 ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。 ## 2. Actor 层次结构 ### 2.1 Actor 层次 ``` AppActor (系统级) └── TenantActor (租户级) ├── DeviceActor (设备级) └── RuleChainActor (规则链级) ``` ### 2.2 Actor 类型 #### 2.2.1 AppActor **位置**: `application/src/main/java/org/thingsboard/server/actors/app/AppActor.java` **作用**: 系统级 Actor,管理所有租户 Actor。 **关键方法**: ```java /** * 初始化租户 Actor * 系统启动时,为所有已存在的租户创建 TenantActor */ private void initTenantActors() { if (systemContext.isTenantComponentsInitEnabled()) { PageDataIterable tenantIterator = new PageDataIterable<>( tenantService::findTenants, ENTITY_PACK_LIMIT); for (Tenant tenant : tenantIterator) { getOrCreateTenantActor(tenant.getId()).ifPresentOrElse( tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()), () -> log.debug("[{}] Skipped actor creation", tenant.getId()) ); } } } /** * 获取或创建租户 Actor */ private Optional getOrCreateTenantActor(TenantId tenantId) { if (deletedTenants.contains(tenantId)) { return Optional.empty(); } return Optional.ofNullable(ctx.getOrCreateChildActor( new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId), () -> true)); } /** * 处理发送到设备 Actor 的消息 * 路由到对应的租户 Actor */ private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse( tenantActor -> { if (priority) { tenantActor.tellWithHighPriority(msg); } else { tenantActor.tell(msg); } }, () -> { // 如果租户不存在,直接成功回调 if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); } } ); } ``` **处理的消息类型**: - `APP_INIT_MSG`: 应用初始化消息 - `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层到设备 Actor 的消息 - `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息 - `PARTITION_CHANGE_MSG`: 分区变更消息 - `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息 #### 2.2.2 TenantActor **位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java` **作用**: 租户级 Actor,管理租户下的所有设备 Actor 和规则链 Actor。 **关键方法**: ```java /** * 处理发送到设备 Actor 的消息 * 路由到对应的设备 Actor */ private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) { if (!isCore) { log.warn("RECEIVED INVALID MESSAGE: {}", msg); } if (deletedDevices.contains(msg.getDeviceId())) { log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg); return; } TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId()); if (priority) { deviceActor.tellWithHighPriority(msg); } else { deviceActor.tell(msg); } } /** * 处理发送到规则引擎的消息 */ private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { if (!isRuleEngine) { log.warn("RECEIVED INVALID MESSAGE: {}", msg); return; } TbMsg tbMsg = msg.getMsg(); if (getApiUsageState().isReExecEnabled()) { if (tbMsg.getRuleChainId() == null) { // 使用根规则链 if (getRootChainActor() != null) { getRootChainActor().tell(msg); } else { tbMsg.getCallback().onFailure( new RuleEngineException("No Root Rule Chain available!")); } } else { // 使用指定的规则链 try { ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); } catch (TbActorNotRegisteredException ex) { log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId()); tbMsg.getCallback().onSuccess(); } } } else { log.trace("[{}] Ack message because Rule Engine is disabled", tenantId); tbMsg.getCallback().onSuccess(); } } ``` #### 2.2.3 DeviceActor **位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java` **作用**: 设备级 Actor,处理单个设备的所有消息。 **关键方法**: ```java /** * 处理消息 */ @Override protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case TRANSPORT_TO_DEVICE_ACTOR_MSG: processor.process((TransportToDeviceActorMsgWrapper) msg); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg); break; case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG: ctx.stop(ctx.getSelf()); break; case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processCredentialsUpdate(msg); break; case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg); break; case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg); break; case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg); break; case SESSION_TIMEOUT_MSG: processor.checkSessionsTimeout(); break; // ... 其他消息类型 default: return false; } return true; } ``` **实际处理逻辑**: DeviceActor 使用 `DeviceActorMessageProcessor` 来处理具体的业务逻辑。 ## 3. Actor 消息处理流程 ### 3.1 消息路由流程 1. **传输层消息**: ``` 传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor ``` 2. **规则引擎消息**: ``` 核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor ``` ### 3.2 消息优先级 Actor 系统支持两种消息优先级: - **普通消息**: `tell(msg)` - 正常优先级 - **高优先级消息**: `tellWithHighPriority(msg)` - 高优先级,优先处理 高优先级消息用于: - 设备凭证更新 - 设备名称/类型更新 - RPC 请求/响应 - 设备删除 ## 4. Actor 生命周期管理 ### 4.1 Actor 创建 Actor 采用懒加载策略: ```java // 获取或创建 Actor TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcherName, Supplier actorCreator, Supplier shouldCreate); ``` ### 4.2 Actor 停止 - **设备删除**: `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG` 消息会停止对应的 DeviceActor - **租户删除**: `ComponentLifecycleEvent.DELETED` 事件会停止对应的 TenantActor ```java // 租户删除处理 if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); deletedTenants.add(tenantId); ctx.stop(new TbEntityActorId(tenantId)); return; } ``` ## 5. Actor 消息类型 ### 5.1 传输层消息 - `TRANSPORT_TO_DEVICE_ACTOR_MSG`: 传输层发送到设备 Actor 的消息 - 包含遥测数据、属性更新等 ### 5.2 设备管理消息 - `DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备属性更新 - `DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备凭证更新 - `DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG`: 设备名称/类型更新 - `DEVICE_DELETE_TO_DEVICE_ACTOR_MSG`: 设备删除 - `DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG`: Edge 设备更新 ### 5.3 RPC 消息 - `DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG`: 发送到设备的 RPC 请求 - `DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG`: 设备 RPC 响应 - `DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG`: 服务器端 RPC 超时 ### 5.4 规则引擎消息 - `QUEUE_TO_RULE_ENGINE_MSG`: 发送到规则引擎的消息 - `RULE_CHAIN_INPUT_MSG`: 规则链输入消息 - `RULE_CHAIN_OUTPUT_MSG`: 规则链输出消息 ### 5.5 系统消息 - `APP_INIT_MSG`: 应用初始化消息 - `PARTITION_CHANGE_MSG`: 分区变更消息 - `SESSION_TIMEOUT_MSG`: 会话超时消息 - `COMPONENT_LIFE_CYCLE_MSG`: 组件生命周期消息 ## 6. Actor 系统配置 ### 6.1 Actor 系统上下文 **位置**: `application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java` Actor 系统上下文包含: - 各种服务引用(DAO、Service等) - 配置信息 - 统计信息 ### 6.2 Actor 服务 **位置**: `application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java` Actor 服务负责: - Actor 系统的初始化和关闭 - Actor 的创建和管理 - 消息调度 ## 7. 总结 ThingsBoard 的 Actor 系统具有以下特点: 1. **层次化设计**: AppActor -> TenantActor -> DeviceActor,清晰的层次结构 2. **懒加载**: Actor 按需创建,节省资源 3. **消息驱动**: 所有操作通过消息传递,实现异步处理 4. **优先级支持**: 支持普通和高优先级消息 5. **生命周期管理**: 完善的 Actor 创建和销毁机制 Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。