19 KiB
tb-rule-engine 流量分析
1. 镜像概述
tb-rule-engine 是 ThingsBoard 的规则引擎服务,负责:
- 接收来自核心服务的消息
- 执行规则链(Rule Chain)
- 处理规则节点(Rule Node)
- 执行JavaScript脚本(调用tb-js-executor)
- 触发告警、通知等操作
- 保存处理结果到数据库
镜像名称: thingsboard/tb-node
服务类型: 通过环境变量 TB_SERVICE_TYPE=tb-rule-engine 区分
源码位置: application/ 目录(与tb-core使用相同代码,通过服务类型区分)
2. 流量入口
2.1 Kafka 消息入口
入口点:
application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java
Kafka主题: tb.rule-engine.*
流量来源:
- tb-core: 核心服务推送的设备数据、属性更新、事件等
消息类型:
ToRuleEngineMsg: 包含TbMsg消息TbMsg: 规则引擎处理的消息实体,包含:- 消息类型(TELEMETRY、ATTRIBUTES_UPDATED、RPC_CALL_FROM_SERVER等)
- 消息数据(JSON格式)
- 元数据(设备ID、租户ID等)
- 规则链ID
3. 流量处理流程
3.1 完整处理流程
Kafka (tb.rule-engine.* 主题)
↓
TbRuleEngineQueueConsumerManager.processMsgs()
↓
TbRuleEngineSubmitStrategy (提交策略)
├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理
├─ SEQUENTIAL_BY_TENANT: 按租户顺序处理
└─ BURST: 并发处理
↓
提交消息到Actor系统
↓
AppActor.onQueueToRuleEngineMsg()
↓
TenantActor.onQueueToRuleEngineMsg()
↓
RuleChainActor.onQueueToRuleEngineMsg()
↓
RuleChainActorMessageProcessor.onQueueToRuleEngineMsg()
↓
规则链处理
├─ 获取规则链配置
├─ 遍历规则节点
└─ 执行规则节点逻辑
↓
RuleNode处理
├─ 过滤节点: 判断是否继续
├─ 转换节点: 数据转换
├─ 动作节点: 保存数据、发送告警等
├─ JavaScript节点: 调用tb-js-executor
└─ 外部节点: 调用外部系统
↓
消息传递到下一个节点
├─ 成功关系: 正常流程
├─ 失败关系: 错误处理流程
└─ 其他关系: 条件分支
↓
规则链执行完成
↓
返回结果
├─ 保存到数据库(PostgreSQL/Cassandra)
├─ 发送到Kafka(通知其他服务)
└─ 触发告警、通知等
核心代码模块:
-
TbRuleEngineQueueConsumerManager (
application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java)- 功能: 消费Kafka中的规则引擎消息
- 关键方法:
processMsgs(): 批量处理消息submitMessage(): 提交消息到Actor系统
-
TenantActor (
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java)- 功能: 租户级别消息路由到规则链
- 关键方法:
onQueueToRuleEngineMsg(): 处理规则引擎消息
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { TbMsg tbMsg = msg.getMsg(); if (tbMsg.getRuleChainId() == null) { // 使用根规则链 getRootChainActor().tell(msg); } else { // 使用指定的规则链 ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); } }
-
RuleChainActor (
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java)- 功能: 规则链级别的Actor,管理规则链执行
- 消息处理: 委托给RuleChainActorMessageProcessor
-
RuleChainActorMessageProcessor (
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java)- 功能: 规则链消息处理的核心逻辑
- 关键方法:
onQueueToRuleEngineMsg(): 处理规则引擎消息onTellNext(): 传递消息到下一个规则节点pushMsgToNode(): 推送消息到规则节点Actor
-
RuleNodeActor (
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java)- 功能: 规则节点级别的Actor,执行单个规则节点逻辑
- 处理流程:
- 接收消息
- 执行规则节点逻辑(Filter、Transform、Action等)
- 根据关系(成功/失败)传递到下一个节点
3.2 规则节点类型处理
3.2.1 过滤节点(Filter Node)
功能: 判断消息是否满足条件,决定是否继续处理
示例:
- 消息类型过滤: 只处理遥测数据
- 脚本过滤: 使用JavaScript判断
代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/
3.2.2 转换节点(Transform Node)
功能: 转换消息数据格式
示例:
- 脚本转换: 使用JavaScript转换数据
- 删除键值: 删除特定字段
代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/
3.2.3 动作节点(Action Node)
功能: 执行具体操作
类型:
- 保存遥测:
SaveTelemetryNode- 保存到数据库 - 保存属性:
SaveAttributesNode- 保存到数据库 - 告警创建:
CreateAlarmNode- 创建告警 - RPC调用:
RpcCallNode- 调用RPC - 发送通知:
SendNotificationNode- 发送通知
代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/
3.2.4 JavaScript节点
功能: 执行JavaScript脚本
流程:
RuleNode (JavaScript节点)
↓
构建JavaScript执行请求
↓
Kafka (tb.js-evaluator.* 主题)
↓
tb-js-executor (执行JavaScript)
↓
返回执行结果
↓
继续规则链处理
代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java
3.3 消息提交策略
位置: application/src/main/java/org/thingsboard/server/service/queue/ruleengine/strategy/
策略类型:
-
SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理
- 同一设备的消息按顺序处理
- 保证消息的顺序性
-
SEQUENTIAL_BY_TENANT: 按租户顺序处理
- 同一租户的消息按顺序处理
-
BURST: 并发处理
- 不保证顺序,最大化吞吐量
4. 流量出口
4.1 Kafka消息出口
出口点:
- 规则链处理完成后,可能需要发送消息到其他服务
Kafka主题:
tb.core.*: 发送回核心服务(属性更新、RPC请求等)tb.transport.*: 发送到传输服务(推送到设备)
消息类型:
ToCoreMsg: 发送到核心服务ToTransportMsg: 发送到传输服务
4.2 数据库写入出口
出口点: 规则节点执行数据库操作
流量去向:
- PostgreSQL: 告警、通知等实体数据
- Cassandra: 遥测数据、属性数据(如果规则链保存)
关键节点:
SaveTelemetryNode: 保存遥测数据SaveAttributesNode: 保存属性数据CreateAlarmNode: 创建告警记录
4.3 外部系统出口
出口点: 外部集成节点
流量去向:
- HTTP端点(REST API调用)
- MQTT Broker
- 其他外部系统
关键节点:
RestApiCallNode: REST API调用MqttNode: MQTT发布KafkaNode: Kafka发布
5. 核心代码模块详解
5.1 TbRuleEngineQueueConsumerManager
位置: application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java
功能:
- 管理规则引擎的Kafka消费者
- 实现消息批量处理
- 支持多种提交策略
关键方法:
-
processMsgs(): 批量处理消息
protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs, ...) { // 获取提交策略 TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue); // 获取处理策略 TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue); submitStrategy.init(msgs); while (!stopped) { // 提交消息处理 submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg)); // 等待处理完成 final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); // 分析处理结果 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); if (decision.isCommit()) { consumer.commit(); // 提交Kafka offset break; } } } -
submitMessage(): 提交消息到Actor系统
- 创建消息处理上下文
- 发送到AppActor进行处理
5.2 RuleChainActorMessageProcessor
位置: application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
功能:
- 规则链消息处理的核心逻辑
- 管理规则节点的执行顺序
- 处理消息在规则节点间的传递
关键方法:
-
onQueueToRuleEngineMsg(): 处理规则引擎消息
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) { TbMsg msg = envelope.getMsg(); if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) { onTellNext(msg, true); // 从规则链开始处理 } else { // 从指定的规则节点开始处理 onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), ...); } } -
onTellNext(): 传递消息到下一个节点
private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) { RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null; RuleNodeCtx targetCtx; if (targetId == null) { targetCtx = firstNode; // 从第一个节点开始 } else { targetCtx = nodeActors.get(targetId); // 获取指定的节点 } if (targetCtx != null) { pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE); } } -
pushMsgToNode(): 推送消息到规则节点
- 获取规则节点Actor
- 发送消息到节点Actor
- 处理节点执行结果
5.3 RuleNodeActor
位置: application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java
功能:
- 执行单个规则节点的逻辑
- 处理节点执行结果
- 传递消息到下一个节点
关键处理:
-
节点执行:
private void process(TbMsg msg) { // 获取规则节点配置 RuleNode ruleNode = getNode(); // 创建节点上下文 TbContext ctx = new TbNodeContext(actorContext, msg, ruleNode, ...); // 执行节点逻辑 ruleNode.onMsg(ctx, msg); } -
消息传递:
- 根据节点执行结果(成功/失败)
- 获取对应的关系(Success Relation / Failure Relation)
- 传递到下一个节点
5.4 规则节点实现
位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/
5.4.1 SaveTelemetryNode
功能: 保存遥测数据到数据库
代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveTelemetryNode.java
处理流程:
- 解析TbMsg中的遥测数据
- 调用DAO保存到数据库
- 传递消息到下一个节点
5.4.2 CreateAlarmNode
功能: 创建告警
代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java
处理流程:
- 检查是否已存在告警
- 创建新的告警记录
- 保存到数据库
5.4.3 TbJavaScriptNode
功能: 执行JavaScript脚本
代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java
处理流程:
- 构建JavaScript执行请求
- 发送到Kafka (tb.js-evaluator.*主题)
- 等待tb-js-executor返回结果
- 使用执行结果继续规则链处理
6. 数据流向图
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ Kafka (tb.rule-engine.* 主题) │
│ └─ tb-core 推送的消息 │
│ ├─ 设备遥测数据 (TELEMETRY) │
│ ├─ 属性更新 (ATTRIBUTES_UPDATED) │
│ ├─ 设备事件 (DEVICE_CONNECT/DISCONNECT) │
│ └─ RPC响应 (RPC_CALL_FROM_DEVICE) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消费 │
│ TbRuleEngineQueueConsumerManager.processMsgs() │
│ │
│ 2. 提交策略处理 │
│ ├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序 │
│ ├─ SEQUENTIAL_BY_TENANT: 按租户顺序 │
│ └─ BURST: 并发处理 │
│ │
│ 3. Actor系统路由 │
│ AppActor → TenantActor → RuleChainActor │
│ │
│ 4. 规则链处理 │
│ RuleChainActorMessageProcessor │
│ ├─ 获取规则链配置 │
│ ├─ 遍历规则节点 │
│ └─ 执行规则节点逻辑 │
│ │
│ 5. 规则节点执行 │
│ ├─ 过滤节点: 判断条件 │
│ ├─ 转换节点: 数据转换 │
│ ├─ 动作节点: 保存数据、创建告警等 │
│ ├─ JavaScript节点: 调用tb-js-executor │
│ └─ 外部节点: 调用外部系统 │
│ │
│ 6. 消息传递 │
│ ├─ Success关系 → 下一个节点 │
│ ├─ Failure关系 → 错误处理节点 │
│ └─ 其他关系 → 条件分支 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 │
│ ├─ tb.core.*: 发送到核心服务 (属性更新、RPC请求) │
│ └─ tb.transport.*: 发送到传输服务 (推送到设备) │
│ │
│ 2. 数据库写入 │
│ ├─ PostgreSQL: 告警、通知等实体数据 │
│ └─ Cassandra: 遥测数据、属性数据 │
│ │
│ 3. 外部系统 │
│ ├─ REST API调用 │
│ ├─ MQTT发布 │
│ └─ Kafka发布 │
│ │
│ 4. JavaScript执行 │
│ └─ tb.js-evaluator.*: 发送到tb-js-executor │
└─────────────────────────────────────────────────────────────────┘
7. 关键配置
7.1 环境变量配置
TB_SERVICE_TYPE=tb-rule-engine: 标识为规则引擎服务ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址TB_QUEUE_TYPE=kafka: 队列类型TB_KAFKA_SERVERS=kafka:9092: Kafka地址
7.2 核心配置项
queue.rule-engine.poll-interval: Kafka消息轮询间隔queue.rule-engine.consumer-per-partition: 是否每个分区一个消费者queue.rule-engine.pack-processing-timeout: 消息包处理超时时间actors.rule.engine.queue.max_pending_msgs: 队列最大待处理消息数
7.3 规则链配置
规则链配置存储在数据库中(PostgreSQL),包含:
- 规则链元数据
- 规则节点列表
- 节点间的关系(连接)
- 节点配置参数
8. 总结
tb-rule-engine 是 ThingsBoard 的规则执行引擎,负责:
- 消息消费: 从Kafka消费tb-core推送的消息
- 规则链执行: 按照配置的规则链顺序执行规则节点
- 节点处理: 支持过滤、转换、动作等多种节点类型
- JavaScript执行: 调用tb-js-executor执行JavaScript脚本
- 数据持久化: 保存处理结果到数据库
- 消息推送: 发送消息到其他服务(核心服务、传输服务等)
整个流程采用了Actor模型和消息驱动的架构,支持:
- 顺序处理: 保证同一设备/租户的消息顺序
- 并发处理: 最大化吞吐量
- 灵活配置: 通过规则链配置实现业务逻辑
- 高可用性: 支持多实例部署