526 lines
19 KiB
Markdown
526 lines
19 KiB
Markdown
|
|
# tb-rule-engine 流量分析
|
|||
|
|
|
|||
|
|
## 1. 镜像概述
|
|||
|
|
|
|||
|
|
**tb-rule-engine** 是 ThingsBoard 的规则引擎服务,负责:
|
|||
|
|
- 接收来自核心服务的消息
|
|||
|
|
- 执行规则链(Rule Chain)
|
|||
|
|
- 处理规则节点(Rule Node)
|
|||
|
|
- 执行JavaScript脚本(调用tb-js-executor)
|
|||
|
|
- 触发告警、通知等操作
|
|||
|
|
- 保存处理结果到数据库
|
|||
|
|
|
|||
|
|
**镜像名称**: `thingsboard/tb-node`
|
|||
|
|
**服务类型**: 通过环境变量 `TB_SERVICE_TYPE=tb-rule-engine` 区分
|
|||
|
|
**源码位置**: `application/` 目录(与tb-core使用相同代码,通过服务类型区分)
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 2. 流量入口
|
|||
|
|
|
|||
|
|
### 2.1 Kafka 消息入口
|
|||
|
|
|
|||
|
|
**入口点**:
|
|||
|
|
- `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`
|
|||
|
|
|
|||
|
|
**Kafka主题**: `tb.rule-engine.*`
|
|||
|
|
|
|||
|
|
**流量来源**:
|
|||
|
|
- **tb-core**: 核心服务推送的设备数据、属性更新、事件等
|
|||
|
|
|
|||
|
|
**消息类型**:
|
|||
|
|
- `ToRuleEngineMsg`: 包含TbMsg消息
|
|||
|
|
- `TbMsg`: 规则引擎处理的消息实体,包含:
|
|||
|
|
- 消息类型(TELEMETRY、ATTRIBUTES_UPDATED、RPC_CALL_FROM_SERVER等)
|
|||
|
|
- 消息数据(JSON格式)
|
|||
|
|
- 元数据(设备ID、租户ID等)
|
|||
|
|
- 规则链ID
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 3. 流量处理流程
|
|||
|
|
|
|||
|
|
### 3.1 完整处理流程
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
Kafka (tb.rule-engine.* 主题)
|
|||
|
|
↓
|
|||
|
|
TbRuleEngineQueueConsumerManager.processMsgs()
|
|||
|
|
↓
|
|||
|
|
TbRuleEngineSubmitStrategy (提交策略)
|
|||
|
|
├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理
|
|||
|
|
├─ SEQUENTIAL_BY_TENANT: 按租户顺序处理
|
|||
|
|
└─ BURST: 并发处理
|
|||
|
|
↓
|
|||
|
|
提交消息到Actor系统
|
|||
|
|
↓
|
|||
|
|
AppActor.onQueueToRuleEngineMsg()
|
|||
|
|
↓
|
|||
|
|
TenantActor.onQueueToRuleEngineMsg()
|
|||
|
|
↓
|
|||
|
|
RuleChainActor.onQueueToRuleEngineMsg()
|
|||
|
|
↓
|
|||
|
|
RuleChainActorMessageProcessor.onQueueToRuleEngineMsg()
|
|||
|
|
↓
|
|||
|
|
规则链处理
|
|||
|
|
├─ 获取规则链配置
|
|||
|
|
├─ 遍历规则节点
|
|||
|
|
└─ 执行规则节点逻辑
|
|||
|
|
↓
|
|||
|
|
RuleNode处理
|
|||
|
|
├─ 过滤节点: 判断是否继续
|
|||
|
|
├─ 转换节点: 数据转换
|
|||
|
|
├─ 动作节点: 保存数据、发送告警等
|
|||
|
|
├─ JavaScript节点: 调用tb-js-executor
|
|||
|
|
└─ 外部节点: 调用外部系统
|
|||
|
|
↓
|
|||
|
|
消息传递到下一个节点
|
|||
|
|
├─ 成功关系: 正常流程
|
|||
|
|
├─ 失败关系: 错误处理流程
|
|||
|
|
└─ 其他关系: 条件分支
|
|||
|
|
↓
|
|||
|
|
规则链执行完成
|
|||
|
|
↓
|
|||
|
|
返回结果
|
|||
|
|
├─ 保存到数据库(PostgreSQL/Cassandra)
|
|||
|
|
├─ 发送到Kafka(通知其他服务)
|
|||
|
|
└─ 触发告警、通知等
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**核心代码模块**:
|
|||
|
|
|
|||
|
|
1. **TbRuleEngineQueueConsumerManager** (`application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`)
|
|||
|
|
- **功能**: 消费Kafka中的规则引擎消息
|
|||
|
|
- **关键方法**:
|
|||
|
|
- `processMsgs()`: 批量处理消息
|
|||
|
|
- `submitMessage()`: 提交消息到Actor系统
|
|||
|
|
|
|||
|
|
2. **TenantActor** (`application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`)
|
|||
|
|
- **功能**: 租户级别消息路由到规则链
|
|||
|
|
- **关键方法**:
|
|||
|
|
- `onQueueToRuleEngineMsg()`: 处理规则引擎消息
|
|||
|
|
```java
|
|||
|
|
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
|
|||
|
|
TbMsg tbMsg = msg.getMsg();
|
|||
|
|
if (tbMsg.getRuleChainId() == null) {
|
|||
|
|
// 使用根规则链
|
|||
|
|
getRootChainActor().tell(msg);
|
|||
|
|
} else {
|
|||
|
|
// 使用指定的规则链
|
|||
|
|
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
3. **RuleChainActor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java`)
|
|||
|
|
- **功能**: 规则链级别的Actor,管理规则链执行
|
|||
|
|
- **消息处理**: 委托给RuleChainActorMessageProcessor
|
|||
|
|
|
|||
|
|
4. **RuleChainActorMessageProcessor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java`)
|
|||
|
|
- **功能**: 规则链消息处理的核心逻辑
|
|||
|
|
- **关键方法**:
|
|||
|
|
- `onQueueToRuleEngineMsg()`: 处理规则引擎消息
|
|||
|
|
- `onTellNext()`: 传递消息到下一个规则节点
|
|||
|
|
- `pushMsgToNode()`: 推送消息到规则节点Actor
|
|||
|
|
|
|||
|
|
5. **RuleNodeActor** (`application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java`)
|
|||
|
|
- **功能**: 规则节点级别的Actor,执行单个规则节点逻辑
|
|||
|
|
- **处理流程**:
|
|||
|
|
- 接收消息
|
|||
|
|
- 执行规则节点逻辑(Filter、Transform、Action等)
|
|||
|
|
- 根据关系(成功/失败)传递到下一个节点
|
|||
|
|
|
|||
|
|
### 3.2 规则节点类型处理
|
|||
|
|
|
|||
|
|
#### 3.2.1 过滤节点(Filter Node)
|
|||
|
|
|
|||
|
|
**功能**: 判断消息是否满足条件,决定是否继续处理
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
- **消息类型过滤**: 只处理遥测数据
|
|||
|
|
- **脚本过滤**: 使用JavaScript判断
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/`
|
|||
|
|
|
|||
|
|
#### 3.2.2 转换节点(Transform Node)
|
|||
|
|
|
|||
|
|
**功能**: 转换消息数据格式
|
|||
|
|
|
|||
|
|
**示例**:
|
|||
|
|
- **脚本转换**: 使用JavaScript转换数据
|
|||
|
|
- **删除键值**: 删除特定字段
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/`
|
|||
|
|
|
|||
|
|
#### 3.2.3 动作节点(Action Node)
|
|||
|
|
|
|||
|
|
**功能**: 执行具体操作
|
|||
|
|
|
|||
|
|
**类型**:
|
|||
|
|
- **保存遥测**: `SaveTelemetryNode` - 保存到数据库
|
|||
|
|
- **保存属性**: `SaveAttributesNode` - 保存到数据库
|
|||
|
|
- **告警创建**: `CreateAlarmNode` - 创建告警
|
|||
|
|
- **RPC调用**: `RpcCallNode` - 调用RPC
|
|||
|
|
- **发送通知**: `SendNotificationNode` - 发送通知
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/`
|
|||
|
|
|
|||
|
|
#### 3.2.4 JavaScript节点
|
|||
|
|
|
|||
|
|
**功能**: 执行JavaScript脚本
|
|||
|
|
|
|||
|
|
**流程**:
|
|||
|
|
```
|
|||
|
|
RuleNode (JavaScript节点)
|
|||
|
|
↓
|
|||
|
|
构建JavaScript执行请求
|
|||
|
|
↓
|
|||
|
|
Kafka (tb.js-evaluator.* 主题)
|
|||
|
|
↓
|
|||
|
|
tb-js-executor (执行JavaScript)
|
|||
|
|
↓
|
|||
|
|
返回执行结果
|
|||
|
|
↓
|
|||
|
|
继续规则链处理
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java`
|
|||
|
|
|
|||
|
|
### 3.3 消息提交策略
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/strategy/`
|
|||
|
|
|
|||
|
|
**策略类型**:
|
|||
|
|
|
|||
|
|
1. **SEQUENTIAL_BY_ORIGINATOR**: 按设备顺序处理
|
|||
|
|
- 同一设备的消息按顺序处理
|
|||
|
|
- 保证消息的顺序性
|
|||
|
|
|
|||
|
|
2. **SEQUENTIAL_BY_TENANT**: 按租户顺序处理
|
|||
|
|
- 同一租户的消息按顺序处理
|
|||
|
|
|
|||
|
|
3. **BURST**: 并发处理
|
|||
|
|
- 不保证顺序,最大化吞吐量
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 4. 流量出口
|
|||
|
|
|
|||
|
|
### 4.1 Kafka消息出口
|
|||
|
|
|
|||
|
|
**出口点**:
|
|||
|
|
- 规则链处理完成后,可能需要发送消息到其他服务
|
|||
|
|
|
|||
|
|
**Kafka主题**:
|
|||
|
|
- `tb.core.*`: 发送回核心服务(属性更新、RPC请求等)
|
|||
|
|
- `tb.transport.*`: 发送到传输服务(推送到设备)
|
|||
|
|
|
|||
|
|
**消息类型**:
|
|||
|
|
- `ToCoreMsg`: 发送到核心服务
|
|||
|
|
- `ToTransportMsg`: 发送到传输服务
|
|||
|
|
|
|||
|
|
### 4.2 数据库写入出口
|
|||
|
|
|
|||
|
|
**出口点**: 规则节点执行数据库操作
|
|||
|
|
|
|||
|
|
**流量去向**:
|
|||
|
|
- **PostgreSQL**: 告警、通知等实体数据
|
|||
|
|
- **Cassandra**: 遥测数据、属性数据(如果规则链保存)
|
|||
|
|
|
|||
|
|
**关键节点**:
|
|||
|
|
- `SaveTelemetryNode`: 保存遥测数据
|
|||
|
|
- `SaveAttributesNode`: 保存属性数据
|
|||
|
|
- `CreateAlarmNode`: 创建告警记录
|
|||
|
|
|
|||
|
|
### 4.3 外部系统出口
|
|||
|
|
|
|||
|
|
**出口点**: 外部集成节点
|
|||
|
|
|
|||
|
|
**流量去向**:
|
|||
|
|
- HTTP端点(REST API调用)
|
|||
|
|
- MQTT Broker
|
|||
|
|
- 其他外部系统
|
|||
|
|
|
|||
|
|
**关键节点**:
|
|||
|
|
- `RestApiCallNode`: REST API调用
|
|||
|
|
- `MqttNode`: MQTT发布
|
|||
|
|
- `KafkaNode`: Kafka发布
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 5. 核心代码模块详解
|
|||
|
|
|
|||
|
|
### 5.1 TbRuleEngineQueueConsumerManager
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java`
|
|||
|
|
|
|||
|
|
**功能**:
|
|||
|
|
- 管理规则引擎的Kafka消费者
|
|||
|
|
- 实现消息批量处理
|
|||
|
|
- 支持多种提交策略
|
|||
|
|
|
|||
|
|
**关键方法**:
|
|||
|
|
|
|||
|
|
1. **processMsgs()**: 批量处理消息
|
|||
|
|
```java
|
|||
|
|
protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs, ...) {
|
|||
|
|
// 获取提交策略
|
|||
|
|
TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
|
|||
|
|
// 获取处理策略
|
|||
|
|
TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue);
|
|||
|
|
|
|||
|
|
submitStrategy.init(msgs);
|
|||
|
|
while (!stopped) {
|
|||
|
|
// 提交消息处理
|
|||
|
|
submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg));
|
|||
|
|
|
|||
|
|
// 等待处理完成
|
|||
|
|
final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
|
|||
|
|
|
|||
|
|
// 分析处理结果
|
|||
|
|
TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
|
|||
|
|
|
|||
|
|
if (decision.isCommit()) {
|
|||
|
|
consumer.commit(); // 提交Kafka offset
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
2. **submitMessage()**: 提交消息到Actor系统
|
|||
|
|
- 创建消息处理上下文
|
|||
|
|
- 发送到AppActor进行处理
|
|||
|
|
|
|||
|
|
### 5.2 RuleChainActorMessageProcessor
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java`
|
|||
|
|
|
|||
|
|
**功能**:
|
|||
|
|
- 规则链消息处理的核心逻辑
|
|||
|
|
- 管理规则节点的执行顺序
|
|||
|
|
- 处理消息在规则节点间的传递
|
|||
|
|
|
|||
|
|
**关键方法**:
|
|||
|
|
|
|||
|
|
1. **onQueueToRuleEngineMsg()**: 处理规则引擎消息
|
|||
|
|
```java
|
|||
|
|
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
|
|||
|
|
TbMsg msg = envelope.getMsg();
|
|||
|
|
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
|
|||
|
|
onTellNext(msg, true); // 从规则链开始处理
|
|||
|
|
} else {
|
|||
|
|
// 从指定的规则节点开始处理
|
|||
|
|
onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), ...);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
2. **onTellNext()**: 传递消息到下一个节点
|
|||
|
|
```java
|
|||
|
|
private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
|
|||
|
|
RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
|
|||
|
|
RuleNodeCtx targetCtx;
|
|||
|
|
if (targetId == null) {
|
|||
|
|
targetCtx = firstNode; // 从第一个节点开始
|
|||
|
|
} else {
|
|||
|
|
targetCtx = nodeActors.get(targetId); // 获取指定的节点
|
|||
|
|
}
|
|||
|
|
if (targetCtx != null) {
|
|||
|
|
pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
3. **pushMsgToNode()**: 推送消息到规则节点
|
|||
|
|
- 获取规则节点Actor
|
|||
|
|
- 发送消息到节点Actor
|
|||
|
|
- 处理节点执行结果
|
|||
|
|
|
|||
|
|
### 5.3 RuleNodeActor
|
|||
|
|
|
|||
|
|
**位置**: `application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java`
|
|||
|
|
|
|||
|
|
**功能**:
|
|||
|
|
- 执行单个规则节点的逻辑
|
|||
|
|
- 处理节点执行结果
|
|||
|
|
- 传递消息到下一个节点
|
|||
|
|
|
|||
|
|
**关键处理**:
|
|||
|
|
|
|||
|
|
1. **节点执行**:
|
|||
|
|
```java
|
|||
|
|
private void process(TbMsg msg) {
|
|||
|
|
// 获取规则节点配置
|
|||
|
|
RuleNode ruleNode = getNode();
|
|||
|
|
|
|||
|
|
// 创建节点上下文
|
|||
|
|
TbContext ctx = new TbNodeContext(actorContext, msg, ruleNode, ...);
|
|||
|
|
|
|||
|
|
// 执行节点逻辑
|
|||
|
|
ruleNode.onMsg(ctx, msg);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
2. **消息传递**:
|
|||
|
|
- 根据节点执行结果(成功/失败)
|
|||
|
|
- 获取对应的关系(Success Relation / Failure Relation)
|
|||
|
|
- 传递到下一个节点
|
|||
|
|
|
|||
|
|
### 5.4 规则节点实现
|
|||
|
|
|
|||
|
|
**位置**: `rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/`
|
|||
|
|
|
|||
|
|
#### 5.4.1 SaveTelemetryNode
|
|||
|
|
|
|||
|
|
**功能**: 保存遥测数据到数据库
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveTelemetryNode.java`
|
|||
|
|
|
|||
|
|
**处理流程**:
|
|||
|
|
1. 解析TbMsg中的遥测数据
|
|||
|
|
2. 调用DAO保存到数据库
|
|||
|
|
3. 传递消息到下一个节点
|
|||
|
|
|
|||
|
|
#### 5.4.2 CreateAlarmNode
|
|||
|
|
|
|||
|
|
**功能**: 创建告警
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java`
|
|||
|
|
|
|||
|
|
**处理流程**:
|
|||
|
|
1. 检查是否已存在告警
|
|||
|
|
2. 创建新的告警记录
|
|||
|
|
3. 保存到数据库
|
|||
|
|
|
|||
|
|
#### 5.4.3 TbJavaScriptNode
|
|||
|
|
|
|||
|
|
**功能**: 执行JavaScript脚本
|
|||
|
|
|
|||
|
|
**代码位置**: `rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java`
|
|||
|
|
|
|||
|
|
**处理流程**:
|
|||
|
|
1. 构建JavaScript执行请求
|
|||
|
|
2. 发送到Kafka (tb.js-evaluator.*主题)
|
|||
|
|
3. 等待tb-js-executor返回结果
|
|||
|
|
4. 使用执行结果继续规则链处理
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 6. 数据流向图
|
|||
|
|
|
|||
|
|
```
|
|||
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|||
|
|
│ 流量入口 │
|
|||
|
|
├─────────────────────────────────────────────────────────────────┤
|
|||
|
|
│ Kafka (tb.rule-engine.* 主题) │
|
|||
|
|
│ └─ tb-core 推送的消息 │
|
|||
|
|
│ ├─ 设备遥测数据 (TELEMETRY) │
|
|||
|
|
│ ├─ 属性更新 (ATTRIBUTES_UPDATED) │
|
|||
|
|
│ ├─ 设备事件 (DEVICE_CONNECT/DISCONNECT) │
|
|||
|
|
│ └─ RPC响应 (RPC_CALL_FROM_DEVICE) │
|
|||
|
|
└─────────────────────────────────────────────────────────────────┘
|
|||
|
|
↓
|
|||
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|||
|
|
│ 流量处理 │
|
|||
|
|
├─────────────────────────────────────────────────────────────────┤
|
|||
|
|
│ 1. Kafka消费 │
|
|||
|
|
│ TbRuleEngineQueueConsumerManager.processMsgs() │
|
|||
|
|
│ │
|
|||
|
|
│ 2. 提交策略处理 │
|
|||
|
|
│ ├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序 │
|
|||
|
|
│ ├─ SEQUENTIAL_BY_TENANT: 按租户顺序 │
|
|||
|
|
│ └─ BURST: 并发处理 │
|
|||
|
|
│ │
|
|||
|
|
│ 3. Actor系统路由 │
|
|||
|
|
│ AppActor → TenantActor → RuleChainActor │
|
|||
|
|
│ │
|
|||
|
|
│ 4. 规则链处理 │
|
|||
|
|
│ RuleChainActorMessageProcessor │
|
|||
|
|
│ ├─ 获取规则链配置 │
|
|||
|
|
│ ├─ 遍历规则节点 │
|
|||
|
|
│ └─ 执行规则节点逻辑 │
|
|||
|
|
│ │
|
|||
|
|
│ 5. 规则节点执行 │
|
|||
|
|
│ ├─ 过滤节点: 判断条件 │
|
|||
|
|
│ ├─ 转换节点: 数据转换 │
|
|||
|
|
│ ├─ 动作节点: 保存数据、创建告警等 │
|
|||
|
|
│ ├─ JavaScript节点: 调用tb-js-executor │
|
|||
|
|
│ └─ 外部节点: 调用外部系统 │
|
|||
|
|
│ │
|
|||
|
|
│ 6. 消息传递 │
|
|||
|
|
│ ├─ Success关系 → 下一个节点 │
|
|||
|
|
│ ├─ Failure关系 → 错误处理节点 │
|
|||
|
|
│ └─ 其他关系 → 条件分支 │
|
|||
|
|
└─────────────────────────────────────────────────────────────────┘
|
|||
|
|
↓
|
|||
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|||
|
|
│ 流量出口 │
|
|||
|
|
├─────────────────────────────────────────────────────────────────┤
|
|||
|
|
│ 1. Kafka消息 │
|
|||
|
|
│ ├─ tb.core.*: 发送到核心服务 (属性更新、RPC请求) │
|
|||
|
|
│ └─ tb.transport.*: 发送到传输服务 (推送到设备) │
|
|||
|
|
│ │
|
|||
|
|
│ 2. 数据库写入 │
|
|||
|
|
│ ├─ PostgreSQL: 告警、通知等实体数据 │
|
|||
|
|
│ └─ Cassandra: 遥测数据、属性数据 │
|
|||
|
|
│ │
|
|||
|
|
│ 3. 外部系统 │
|
|||
|
|
│ ├─ REST API调用 │
|
|||
|
|
│ ├─ MQTT发布 │
|
|||
|
|
│ └─ Kafka发布 │
|
|||
|
|
│ │
|
|||
|
|
│ 4. JavaScript执行 │
|
|||
|
|
│ └─ tb.js-evaluator.*: 发送到tb-js-executor │
|
|||
|
|
└─────────────────────────────────────────────────────────────────┘
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 7. 关键配置
|
|||
|
|
|
|||
|
|
### 7.1 环境变量配置
|
|||
|
|
|
|||
|
|
- `TB_SERVICE_TYPE=tb-rule-engine`: 标识为规则引擎服务
|
|||
|
|
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
|
|||
|
|
- `TB_QUEUE_TYPE=kafka`: 队列类型
|
|||
|
|
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
|
|||
|
|
|
|||
|
|
### 7.2 核心配置项
|
|||
|
|
|
|||
|
|
- `queue.rule-engine.poll-interval`: Kafka消息轮询间隔
|
|||
|
|
- `queue.rule-engine.consumer-per-partition`: 是否每个分区一个消费者
|
|||
|
|
- `queue.rule-engine.pack-processing-timeout`: 消息包处理超时时间
|
|||
|
|
- `actors.rule.engine.queue.max_pending_msgs`: 队列最大待处理消息数
|
|||
|
|
|
|||
|
|
### 7.3 规则链配置
|
|||
|
|
|
|||
|
|
规则链配置存储在数据库中(PostgreSQL),包含:
|
|||
|
|
- 规则链元数据
|
|||
|
|
- 规则节点列表
|
|||
|
|
- 节点间的关系(连接)
|
|||
|
|
- 节点配置参数
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
## 8. 总结
|
|||
|
|
|
|||
|
|
**tb-rule-engine** 是 ThingsBoard 的规则执行引擎,负责:
|
|||
|
|
|
|||
|
|
1. **消息消费**: 从Kafka消费tb-core推送的消息
|
|||
|
|
2. **规则链执行**: 按照配置的规则链顺序执行规则节点
|
|||
|
|
3. **节点处理**: 支持过滤、转换、动作等多种节点类型
|
|||
|
|
4. **JavaScript执行**: 调用tb-js-executor执行JavaScript脚本
|
|||
|
|
5. **数据持久化**: 保存处理结果到数据库
|
|||
|
|
6. **消息推送**: 发送消息到其他服务(核心服务、传输服务等)
|
|||
|
|
|
|||
|
|
整个流程采用了**Actor模型**和**消息驱动**的架构,支持:
|
|||
|
|
- **顺序处理**: 保证同一设备/租户的消息顺序
|
|||
|
|
- **并发处理**: 最大化吞吐量
|
|||
|
|
- **灵活配置**: 通过规则链配置实现业务逻辑
|
|||
|
|
- **高可用性**: 支持多实例部署
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|