thingsboard/summary/03-核心模块源码分析/01-Actor系统分析.md

9.8 KiB
Raw Permalink Blame History

ThingsBoard Actor 系统源码分析

1. 概述

ThingsBoard 使用 Actor 模型来处理并发和消息传递。Actor 系统是 ThingsBoard 的核心架构,负责管理设备、租户和规则链的生命周期。

2. Actor 层次结构

2.1 Actor 层次

AppActor (系统级)
  └── TenantActor (租户级)
      ├── DeviceActor (设备级)
      └── RuleChainActor (规则链级)

2.2 Actor 类型

2.2.1 AppActor

位置: application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

作用: 系统级 Actor管理所有租户 Actor。

关键方法:

/**
 * 初始化租户 Actor
 * 系统启动时,为所有已存在的租户创建 TenantActor
 */
private void initTenantActors() {
    if (systemContext.isTenantComponentsInitEnabled()) {
        PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(
            tenantService::findTenants, ENTITY_PACK_LIMIT);
        for (Tenant tenant : tenantIterator) {
            getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(
                tenantActor -> log.debug("[{}] Tenant actor created.", tenant.getId()),
                () -> log.debug("[{}] Skipped actor creation", tenant.getId())
            );
        }
    }
}

/**
 * 获取或创建租户 Actor
 */
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
    if (deletedTenants.contains(tenantId)) {
        return Optional.empty();
    }
    return Optional.ofNullable(ctx.getOrCreateChildActor(
        new TbEntityActorId(tenantId),
        () -> DefaultActorService.TENANT_DISPATCHER_NAME,
        () -> new TenantActor.ActorCreator(systemContext, tenantId),
        () -> true));
}

/**
 * 处理发送到设备 Actor 的消息
 * 路由到对应的租户 Actor
 */
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
    getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(
        tenantActor -> {
            if (priority) {
                tenantActor.tellWithHighPriority(msg);
            } else {
                tenantActor.tell(msg);
            }
        },
        () -> {
            // 如果租户不存在,直接成功回调
            if (msg instanceof TransportToDeviceActorMsgWrapper) {
                ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
            }
        }
    );
}

处理的消息类型:

  • APP_INIT_MSG: 应用初始化消息
  • TRANSPORT_TO_DEVICE_ACTOR_MSG: 传输层到设备 Actor 的消息
  • QUEUE_TO_RULE_ENGINE_MSG: 发送到规则引擎的消息
  • PARTITION_CHANGE_MSG: 分区变更消息
  • COMPONENT_LIFE_CYCLE_MSG: 组件生命周期消息

2.2.2 TenantActor

位置: application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

作用: 租户级 Actor管理租户下的所有设备 Actor 和规则链 Actor。

关键方法:

/**
 * 处理发送到设备 Actor 的消息
 * 路由到对应的设备 Actor
 */
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
    if (!isCore) {
        log.warn("RECEIVED INVALID MESSAGE: {}", msg);
    }
    if (deletedDevices.contains(msg.getDeviceId())) {
        log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
        return;
    }
    TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
    if (priority) {
        deviceActor.tellWithHighPriority(msg);
    } else {
        deviceActor.tell(msg);
    }
}

/**
 * 处理发送到规则引擎的消息
 */
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
    if (!isRuleEngine) {
        log.warn("RECEIVED INVALID MESSAGE: {}", msg);
        return;
    }
    TbMsg tbMsg = msg.getMsg();
    if (getApiUsageState().isReExecEnabled()) {
        if (tbMsg.getRuleChainId() == null) {
            // 使用根规则链
            if (getRootChainActor() != null) {
                getRootChainActor().tell(msg);
            } else {
                tbMsg.getCallback().onFailure(
                    new RuleEngineException("No Root Rule Chain available!"));
            }
        } else {
            // 使用指定的规则链
            try {
                ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
            } catch (TbActorNotRegisteredException ex) {
                log.trace("Received message for non-existing rule chain: [{}]", 
                    tbMsg.getRuleChainId());
                tbMsg.getCallback().onSuccess();
            }
        }
    } else {
        log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
        tbMsg.getCallback().onSuccess();
    }
}

2.2.3 DeviceActor

位置: application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java

作用: 设备级 Actor处理单个设备的所有消息。

关键方法:

/**
 * 处理消息
 */
@Override
protected boolean doProcess(TbActorMsg msg) {
    switch (msg.getMsgType()) {
        case TRANSPORT_TO_DEVICE_ACTOR_MSG:
            processor.process((TransportToDeviceActorMsgWrapper) msg);
            break;
        case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
            processor.processAttributesUpdate((DeviceAttributesEventNotificationMsg) msg);
            break;
        case DEVICE_DELETE_TO_DEVICE_ACTOR_MSG:
            ctx.stop(ctx.getSelf());
            break;
        case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
            processor.processCredentialsUpdate(msg);
            break;
        case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
            processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
            break;
        case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
            processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
            break;
        case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
            processor.processRpcResponsesFromEdge((FromDeviceRpcResponseActorMsg) msg);
            break;
        case SESSION_TIMEOUT_MSG:
            processor.checkSessionsTimeout();
            break;
        // ... 其他消息类型
        default:
            return false;
    }
    return true;
}

实际处理逻辑: DeviceActor 使用 DeviceActorMessageProcessor 来处理具体的业务逻辑。

3. Actor 消息处理流程

3.1 消息路由流程

  1. 传输层消息:

    传输层 -> 消息队列 -> 核心服务消费 -> AppActor -> TenantActor -> DeviceActor
    
  2. 规则引擎消息:

    核心服务 -> 消息队列 -> 规则引擎服务消费 -> AppActor -> TenantActor -> RuleChainActor
    

3.2 消息优先级

Actor 系统支持两种消息优先级:

  • 普通消息: tell(msg) - 正常优先级
  • 高优先级消息: tellWithHighPriority(msg) - 高优先级,优先处理

高优先级消息用于:

  • 设备凭证更新
  • 设备名称/类型更新
  • RPC 请求/响应
  • 设备删除

4. Actor 生命周期管理

4.1 Actor 创建

Actor 采用懒加载策略:

// 获取或创建 Actor
TbActorRef getOrCreateChildActor(TbActorId actorId, 
                                  Supplier<String> dispatcherName,
                                  Supplier<TbActor> actorCreator,
                                  Supplier<Boolean> shouldCreate);

4.2 Actor 停止

  • 设备删除: DEVICE_DELETE_TO_DEVICE_ACTOR_MSG 消息会停止对应的 DeviceActor
  • 租户删除: ComponentLifecycleEvent.DELETED 事件会停止对应的 TenantActor
// 租户删除处理
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
    log.info("[{}] Handling tenant deleted notification: {}", 
        msg.getTenantId(), msg);
    deletedTenants.add(tenantId);
    ctx.stop(new TbEntityActorId(tenantId));
    return;
}

5. Actor 消息类型

5.1 传输层消息

  • TRANSPORT_TO_DEVICE_ACTOR_MSG: 传输层发送到设备 Actor 的消息
    • 包含遥测数据、属性更新等

5.2 设备管理消息

  • DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: 设备属性更新
  • DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: 设备凭证更新
  • DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: 设备名称/类型更新
  • DEVICE_DELETE_TO_DEVICE_ACTOR_MSG: 设备删除
  • DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: Edge 设备更新

5.3 RPC 消息

  • DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: 发送到设备的 RPC 请求
  • DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: 设备 RPC 响应
  • DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG: 服务器端 RPC 超时

5.4 规则引擎消息

  • QUEUE_TO_RULE_ENGINE_MSG: 发送到规则引擎的消息
  • RULE_CHAIN_INPUT_MSG: 规则链输入消息
  • RULE_CHAIN_OUTPUT_MSG: 规则链输出消息

5.5 系统消息

  • APP_INIT_MSG: 应用初始化消息
  • PARTITION_CHANGE_MSG: 分区变更消息
  • SESSION_TIMEOUT_MSG: 会话超时消息
  • COMPONENT_LIFE_CYCLE_MSG: 组件生命周期消息

6. Actor 系统配置

6.1 Actor 系统上下文

位置: application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

Actor 系统上下文包含:

  • 各种服务引用DAO、Service等
  • 配置信息
  • 统计信息

6.2 Actor 服务

位置: application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java

Actor 服务负责:

  • Actor 系统的初始化和关闭
  • Actor 的创建和管理
  • 消息调度

7. 总结

ThingsBoard 的 Actor 系统具有以下特点:

  1. 层次化设计: AppActor -> TenantActor -> DeviceActor清晰的层次结构
  2. 懒加载: Actor 按需创建,节省资源
  3. 消息驱动: 所有操作通过消息传递,实现异步处理
  4. 优先级支持: 支持普通和高优先级消息
  5. 生命周期管理: 完善的 Actor 创建和销毁机制

Actor 系统是 ThingsBoard 处理高并发设备连接的核心机制,通过 Actor 模型实现了良好的并发控制和消息处理能力。