thingsboard/summary/镜像流量分析/02-tb-rule-engine流量分析.md

526 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tb-rule-engine 流量分析
## 1. 镜像概述
**tb-rule-engine** 是 ThingsBoard 的规则引擎服务,负责:
- 接收来自核心服务的消息
- 执行规则链Rule Chain
- 处理规则节点Rule Node
- 执行JavaScript脚本调用tb-js-executor
- 触发告警、通知等操作
- 保存处理结果到数据库
**镜像名称**: `thingsboard/tb-node`
**服务类型**: 通过环境变量 `TB_SERVICE_TYPE=tb-rule-engine` 区分
**源码位置**: `application/` 目录与tb-core使用相同代码通过服务类型区分
---
## 2. 流量入口
### 2.1 Kafka 消息入口
**入口点**:
- `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`
**Kafka主题**: `tb.rule-engine.*`
**流量来源**:
- **tb-core**: 核心服务推送的设备数据、属性更新、事件等
**消息类型**:
- `ToRuleEngineMsg`: 包含TbMsg消息
- `TbMsg`: 规则引擎处理的消息实体,包含:
- 消息类型TELEMETRY、ATTRIBUTES_UPDATED、RPC_CALL_FROM_SERVER等
- 消息数据JSON格式
- 元数据设备ID、租户ID等
- 规则链ID
---
## 3. 流量处理流程
### 3.1 完整处理流程
```
Kafka (tb.rule-engine.* 主题)
TbRuleEngineQueueConsumerManager.processMsgs()
TbRuleEngineSubmitStrategy (提交策略)
├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理
├─ SEQUENTIAL_BY_TENANT: 按租户顺序处理
└─ BURST: 并发处理
提交消息到Actor系统
AppActor.onQueueToRuleEngineMsg()
TenantActor.onQueueToRuleEngineMsg()
RuleChainActor.onQueueToRuleEngineMsg()
RuleChainActorMessageProcessor.onQueueToRuleEngineMsg()
规则链处理
├─ 获取规则链配置
├─ 遍历规则节点
└─ 执行规则节点逻辑
RuleNode处理
├─ 过滤节点: 判断是否继续
├─ 转换节点: 数据转换
├─ 动作节点: 保存数据、发送告警等
├─ JavaScript节点: 调用tb-js-executor
└─ 外部节点: 调用外部系统
消息传递到下一个节点
├─ 成功关系: 正常流程
├─ 失败关系: 错误处理流程
└─ 其他关系: 条件分支
规则链执行完成
返回结果
├─ 保存到数据库PostgreSQL/Cassandra
├─ 发送到Kafka通知其他服务
└─ 触发告警、通知等
```
**核心代码模块**:
1. **TbRuleEngineQueueConsumerManager** (`application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`)
- **功能**: 消费Kafka中的规则引擎消息
- **关键方法**:
- `processMsgs()`: 批量处理消息
- `submitMessage()`: 提交消息到Actor系统
2. **TenantActor** (`application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`)
- **功能**: 租户级别消息路由到规则链
- **关键方法**:
- `onQueueToRuleEngineMsg()`: 处理规则引擎消息
```java
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
TbMsg tbMsg = msg.getMsg();
if (tbMsg.getRuleChainId() == null) {
// 使用根规则链
getRootChainActor().tell(msg);
} else {
// 使用指定的规则链
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
}
}
```
3. **RuleChainActor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java`)
- **功能**: 规则链级别的Actor管理规则链执行
- **消息处理**: 委托给RuleChainActorMessageProcessor
4. **RuleChainActorMessageProcessor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java`)
- **功能**: 规则链消息处理的核心逻辑
- **关键方法**:
- `onQueueToRuleEngineMsg()`: 处理规则引擎消息
- `onTellNext()`: 传递消息到下一个规则节点
- `pushMsgToNode()`: 推送消息到规则节点Actor
5. **RuleNodeActor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java`)
- **功能**: 规则节点级别的Actor执行单个规则节点逻辑
- **处理流程**:
- 接收消息
- 执行规则节点逻辑Filter、Transform、Action等
- 根据关系(成功/失败)传递到下一个节点
### 3.2 规则节点类型处理
#### 3.2.1 过滤节点Filter Node
**功能**: 判断消息是否满足条件,决定是否继续处理
**示例**:
- **消息类型过滤**: 只处理遥测数据
- **脚本过滤**: 使用JavaScript判断
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/`
#### 3.2.2 转换节点Transform Node
**功能**: 转换消息数据格式
**示例**:
- **脚本转换**: 使用JavaScript转换数据
- **删除键值**: 删除特定字段
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/`
#### 3.2.3 动作节点Action Node
**功能**: 执行具体操作
**类型**:
- **保存遥测**: `SaveTelemetryNode` - 保存到数据库
- **保存属性**: `SaveAttributesNode` - 保存到数据库
- **告警创建**: `CreateAlarmNode` - 创建告警
- **RPC调用**: `RpcCallNode` - 调用RPC
- **发送通知**: `SendNotificationNode` - 发送通知
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/`
#### 3.2.4 JavaScript节点
**功能**: 执行JavaScript脚本
**流程**:
```
RuleNode (JavaScript节点)
构建JavaScript执行请求
Kafka (tb.js-evaluator.* 主题)
tb-js-executor (执行JavaScript)
返回执行结果
继续规则链处理
```
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java`
### 3.3 消息提交策略
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/strategy/`
**策略类型**:
1. **SEQUENTIAL_BY_ORIGINATOR**: 按设备顺序处理
- 同一设备的消息按顺序处理
- 保证消息的顺序性
2. **SEQUENTIAL_BY_TENANT**: 按租户顺序处理
- 同一租户的消息按顺序处理
3. **BURST**: 并发处理
- 不保证顺序,最大化吞吐量
---
## 4. 流量出口
### 4.1 Kafka消息出口
**出口点**:
- 规则链处理完成后,可能需要发送消息到其他服务
**Kafka主题**:
- `tb.core.*`: 发送回核心服务属性更新、RPC请求等
- `tb.transport.*`: 发送到传输服务(推送到设备)
**消息类型**:
- `ToCoreMsg`: 发送到核心服务
- `ToTransportMsg`: 发送到传输服务
### 4.2 数据库写入出口
**出口点**: 规则节点执行数据库操作
**流量去向**:
- **PostgreSQL**: 告警、通知等实体数据
- **Cassandra**: 遥测数据、属性数据(如果规则链保存)
**关键节点**:
- `SaveTelemetryNode`: 保存遥测数据
- `SaveAttributesNode`: 保存属性数据
- `CreateAlarmNode`: 创建告警记录
### 4.3 外部系统出口
**出口点**: 外部集成节点
**流量去向**:
- HTTP端点REST API调用
- MQTT Broker
- 其他外部系统
**关键节点**:
- `RestApiCallNode`: REST API调用
- `MqttNode`: MQTT发布
- `KafkaNode`: Kafka发布
---
## 5. 核心代码模块详解
### 5.1 TbRuleEngineQueueConsumerManager
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`
**功能**:
- 管理规则引擎的Kafka消费者
- 实现消息批量处理
- 支持多种提交策略
**关键方法**:
1. **processMsgs()**: 批量处理消息
```java
protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs, ...) {
// 获取提交策略
TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
// 获取处理策略
TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue);
submitStrategy.init(msgs);
while (!stopped) {
// 提交消息处理
submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg));
// 等待处理完成
final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
// 分析处理结果
TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
if (decision.isCommit()) {
consumer.commit(); // 提交Kafka offset
break;
}
}
}
```
2. **submitMessage()**: 提交消息到Actor系统
- 创建消息处理上下文
- 发送到AppActor进行处理
### 5.2 RuleChainActorMessageProcessor
**位置**: `application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java`
**功能**:
- 规则链消息处理的核心逻辑
- 管理规则节点的执行顺序
- 处理消息在规则节点间的传递
**关键方法**:
1. **onQueueToRuleEngineMsg()**: 处理规则引擎消息
```java
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getMsg();
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
onTellNext(msg, true); // 从规则链开始处理
} else {
// 从指定的规则节点开始处理
onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), ...);
}
}
```
2. **onTellNext()**: 传递消息到下一个节点
```java
private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
RuleNodeCtx targetCtx;
if (targetId == null) {
targetCtx = firstNode; // 从第一个节点开始
} else {
targetCtx = nodeActors.get(targetId); // 获取指定的节点
}
if (targetCtx != null) {
pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
}
}
```
3. **pushMsgToNode()**: 推送消息到规则节点
- 获取规则节点Actor
- 发送消息到节点Actor
- 处理节点执行结果
### 5.3 RuleNodeActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java`
**功能**:
- 执行单个规则节点的逻辑
- 处理节点执行结果
- 传递消息到下一个节点
**关键处理**:
1. **节点执行**:
```java
private void process(TbMsg msg) {
// 获取规则节点配置
RuleNode ruleNode = getNode();
// 创建节点上下文
TbContext ctx = new TbNodeContext(actorContext, msg, ruleNode, ...);
// 执行节点逻辑
ruleNode.onMsg(ctx, msg);
}
```
2. **消息传递**:
- 根据节点执行结果(成功/失败)
- 获取对应的关系Success Relation / Failure Relation
- 传递到下一个节点
### 5.4 规则节点实现
**位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/`
#### 5.4.1 SaveTelemetryNode
**功能**: 保存遥测数据到数据库
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveTelemetryNode.java`
**处理流程**:
1. 解析TbMsg中的遥测数据
2. 调用DAO保存到数据库
3. 传递消息到下一个节点
#### 5.4.2 CreateAlarmNode
**功能**: 创建告警
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java`
**处理流程**:
1. 检查是否已存在告警
2. 创建新的告警记录
3. 保存到数据库
#### 5.4.3 TbJavaScriptNode
**功能**: 执行JavaScript脚本
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java`
**处理流程**:
1. 构建JavaScript执行请求
2. 发送到Kafka (tb.js-evaluator.*主题)
3. 等待tb-js-executor返回结果
4. 使用执行结果继续规则链处理
---
## 6. 数据流向图
```
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ Kafka (tb.rule-engine.* 主题) │
│ └─ tb-core 推送的消息 │
│ ├─ 设备遥测数据 (TELEMETRY) │
│ ├─ 属性更新 (ATTRIBUTES_UPDATED) │
│ ├─ 设备事件 (DEVICE_CONNECT/DISCONNECT) │
│ └─ RPC响应 (RPC_CALL_FROM_DEVICE) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消费 │
│ TbRuleEngineQueueConsumerManager.processMsgs() │
│ │
│ 2. 提交策略处理 │
│ ├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序 │
│ ├─ SEQUENTIAL_BY_TENANT: 按租户顺序 │
│ └─ BURST: 并发处理 │
│ │
│ 3. Actor系统路由 │
│ AppActor → TenantActor → RuleChainActor │
│ │
│ 4. 规则链处理 │
│ RuleChainActorMessageProcessor │
│ ├─ 获取规则链配置 │
│ ├─ 遍历规则节点 │
│ └─ 执行规则节点逻辑 │
│ │
│ 5. 规则节点执行 │
│ ├─ 过滤节点: 判断条件 │
│ ├─ 转换节点: 数据转换 │
│ ├─ 动作节点: 保存数据、创建告警等 │
│ ├─ JavaScript节点: 调用tb-js-executor │
│ └─ 外部节点: 调用外部系统 │
│ │
│ 6. 消息传递 │
│ ├─ Success关系 → 下一个节点 │
│ ├─ Failure关系 → 错误处理节点 │
│ └─ 其他关系 → 条件分支 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 │
│ ├─ tb.core.*: 发送到核心服务 (属性更新、RPC请求) │
│ └─ tb.transport.*: 发送到传输服务 (推送到设备) │
│ │
│ 2. 数据库写入 │
│ ├─ PostgreSQL: 告警、通知等实体数据 │
│ └─ Cassandra: 遥测数据、属性数据 │
│ │
│ 3. 外部系统 │
│ ├─ REST API调用 │
│ ├─ MQTT发布 │
│ └─ Kafka发布 │
│ │
│ 4. JavaScript执行 │
│ └─ tb.js-evaluator.*: 发送到tb-js-executor │
└─────────────────────────────────────────────────────────────────┘
```
---
## 7. 关键配置
### 7.1 环境变量配置
- `TB_SERVICE_TYPE=tb-rule-engine`: 标识为规则引擎服务
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
- `TB_QUEUE_TYPE=kafka`: 队列类型
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
### 7.2 核心配置项
- `queue.rule-engine.poll-interval`: Kafka消息轮询间隔
- `queue.rule-engine.consumer-per-partition`: 是否每个分区一个消费者
- `queue.rule-engine.pack-processing-timeout`: 消息包处理超时时间
- `actors.rule.engine.queue.max_pending_msgs`: 队列最大待处理消息数
### 7.3 规则链配置
规则链配置存储在数据库中PostgreSQL包含
- 规则链元数据
- 规则节点列表
- 节点间的关系(连接)
- 节点配置参数
---
## 8. 总结
**tb-rule-engine** 是 ThingsBoard 的规则执行引擎,负责:
1. **消息消费**: 从Kafka消费tb-core推送的消息
2. **规则链执行**: 按照配置的规则链顺序执行规则节点
3. **节点处理**: 支持过滤、转换、动作等多种节点类型
4. **JavaScript执行**: 调用tb-js-executor执行JavaScript脚本
5. **数据持久化**: 保存处理结果到数据库
6. **消息推送**: 发送消息到其他服务(核心服务、传输服务等)
整个流程采用了**Actor模型**和**消息驱动**的架构,支持:
- **顺序处理**: 保证同一设备/租户的消息顺序
- **并发处理**: 最大化吞吐量
- **灵活配置**: 通过规则链配置实现业务逻辑
- **高可用性**: 支持多实例部署