thingsboard/summary/镜像流量分析/02-tb-rule-engine流量分析.md

19 KiB
Raw Permalink Blame History

tb-rule-engine 流量分析

1. 镜像概述

tb-rule-engine 是 ThingsBoard 的规则引擎服务,负责:

  • 接收来自核心服务的消息
  • 执行规则链Rule Chain
  • 处理规则节点Rule Node
  • 执行JavaScript脚本调用tb-js-executor
  • 触发告警、通知等操作
  • 保存处理结果到数据库

镜像名称: thingsboard/tb-node
服务类型: 通过环境变量 TB_SERVICE_TYPE=tb-rule-engine 区分
源码位置: application/ 目录与tb-core使用相同代码通过服务类型区分


2. 流量入口

2.1 Kafka 消息入口

入口点:

  • application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java

Kafka主题: tb.rule-engine.*

流量来源:

  • tb-core: 核心服务推送的设备数据、属性更新、事件等

消息类型:

  • ToRuleEngineMsg: 包含TbMsg消息
  • TbMsg: 规则引擎处理的消息实体,包含:
    • 消息类型TELEMETRY、ATTRIBUTES_UPDATED、RPC_CALL_FROM_SERVER等
    • 消息数据JSON格式
    • 元数据设备ID、租户ID等
    • 规则链ID

3. 流量处理流程

3.1 完整处理流程

Kafka (tb.rule-engine.* 主题)
  ↓
TbRuleEngineQueueConsumerManager.processMsgs()
  ↓
TbRuleEngineSubmitStrategy (提交策略)
  ├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理
  ├─ SEQUENTIAL_BY_TENANT: 按租户顺序处理
  └─ BURST: 并发处理
  ↓
提交消息到Actor系统
  ↓
AppActor.onQueueToRuleEngineMsg()
  ↓
TenantActor.onQueueToRuleEngineMsg()
  ↓
RuleChainActor.onQueueToRuleEngineMsg()
  ↓
RuleChainActorMessageProcessor.onQueueToRuleEngineMsg()
  ↓
规则链处理
  ├─ 获取规则链配置
  ├─ 遍历规则节点
  └─ 执行规则节点逻辑
  ↓
RuleNode处理
  ├─ 过滤节点: 判断是否继续
  ├─ 转换节点: 数据转换
  ├─ 动作节点: 保存数据、发送告警等
  ├─ JavaScript节点: 调用tb-js-executor
  └─ 外部节点: 调用外部系统
  ↓
消息传递到下一个节点
  ├─ 成功关系: 正常流程
  ├─ 失败关系: 错误处理流程
  └─ 其他关系: 条件分支
  ↓
规则链执行完成
  ↓
返回结果
  ├─ 保存到数据库PostgreSQL/Cassandra
  ├─ 发送到Kafka通知其他服务
  └─ 触发告警、通知等

核心代码模块:

  1. TbRuleEngineQueueConsumerManager (application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java)

    • 功能: 消费Kafka中的规则引擎消息
    • 关键方法:
      • processMsgs(): 批量处理消息
      • submitMessage(): 提交消息到Actor系统
  2. TenantActor (application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java)

    • 功能: 租户级别消息路由到规则链
    • 关键方法:
      • onQueueToRuleEngineMsg(): 处理规则引擎消息
      private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
          TbMsg tbMsg = msg.getMsg();
          if (tbMsg.getRuleChainId() == null) {
              // 使用根规则链
              getRootChainActor().tell(msg);
          } else {
              // 使用指定的规则链
              ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
          }
      }
      
  3. RuleChainActor (application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java)

    • 功能: 规则链级别的Actor管理规则链执行
    • 消息处理: 委托给RuleChainActorMessageProcessor
  4. RuleChainActorMessageProcessor (application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java)

    • 功能: 规则链消息处理的核心逻辑
    • 关键方法:
      • onQueueToRuleEngineMsg(): 处理规则引擎消息
      • onTellNext(): 传递消息到下一个规则节点
      • pushMsgToNode(): 推送消息到规则节点Actor
  5. RuleNodeActor (application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java)

    • 功能: 规则节点级别的Actor执行单个规则节点逻辑
    • 处理流程:
      • 接收消息
      • 执行规则节点逻辑Filter、Transform、Action等
      • 根据关系(成功/失败)传递到下一个节点

3.2 规则节点类型处理

3.2.1 过滤节点Filter Node

功能: 判断消息是否满足条件,决定是否继续处理

示例:

  • 消息类型过滤: 只处理遥测数据
  • 脚本过滤: 使用JavaScript判断

代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/

3.2.2 转换节点Transform Node

功能: 转换消息数据格式

示例:

  • 脚本转换: 使用JavaScript转换数据
  • 删除键值: 删除特定字段

代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/

3.2.3 动作节点Action Node

功能: 执行具体操作

类型:

  • 保存遥测: SaveTelemetryNode - 保存到数据库
  • 保存属性: SaveAttributesNode - 保存到数据库
  • 告警创建: CreateAlarmNode - 创建告警
  • RPC调用: RpcCallNode - 调用RPC
  • 发送通知: SendNotificationNode - 发送通知

代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/

3.2.4 JavaScript节点

功能: 执行JavaScript脚本

流程:

RuleNode (JavaScript节点)
  ↓
构建JavaScript执行请求
  ↓
Kafka (tb.js-evaluator.* 主题)
  ↓
tb-js-executor (执行JavaScript)
  ↓
返回执行结果
  ↓
继续规则链处理

代码位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java

3.3 消息提交策略

位置: application/src/main/java/org/thingsboard/server/service/queue/ruleengine/strategy/

策略类型:

  1. SEQUENTIAL_BY_ORIGINATOR: 按设备顺序处理

    • 同一设备的消息按顺序处理
    • 保证消息的顺序性
  2. SEQUENTIAL_BY_TENANT: 按租户顺序处理

    • 同一租户的消息按顺序处理
  3. BURST: 并发处理

    • 不保证顺序,最大化吞吐量

4. 流量出口

4.1 Kafka消息出口

出口点:

  • 规则链处理完成后,可能需要发送消息到其他服务

Kafka主题:

  • tb.core.*: 发送回核心服务属性更新、RPC请求等
  • tb.transport.*: 发送到传输服务(推送到设备)

消息类型:

  • ToCoreMsg: 发送到核心服务
  • ToTransportMsg: 发送到传输服务

4.2 数据库写入出口

出口点: 规则节点执行数据库操作

流量去向:

  • PostgreSQL: 告警、通知等实体数据
  • Cassandra: 遥测数据、属性数据(如果规则链保存)

关键节点:

  • SaveTelemetryNode: 保存遥测数据
  • SaveAttributesNode: 保存属性数据
  • CreateAlarmNode: 创建告警记录

4.3 外部系统出口

出口点: 外部集成节点

流量去向:

  • HTTP端点REST API调用
  • MQTT Broker
  • 其他外部系统

关键节点:

  • RestApiCallNode: REST API调用
  • MqttNode: MQTT发布
  • KafkaNode: Kafka发布

5. 核心代码模块详解

5.1 TbRuleEngineQueueConsumerManager

位置: application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java

功能:

  • 管理规则引擎的Kafka消费者
  • 实现消息批量处理
  • 支持多种提交策略

关键方法:

  1. processMsgs(): 批量处理消息

    protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs, ...) {
        // 获取提交策略
        TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
        // 获取处理策略
        TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue);
    
        submitStrategy.init(msgs);
        while (!stopped) {
            // 提交消息处理
            submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg));
    
            // 等待处理完成
            final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
    
            // 分析处理结果
            TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
    
            if (decision.isCommit()) {
                consumer.commit(); // 提交Kafka offset
                break;
            }
        }
    }
    
  2. submitMessage(): 提交消息到Actor系统

    • 创建消息处理上下文
    • 发送到AppActor进行处理

5.2 RuleChainActorMessageProcessor

位置: application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

功能:

  • 规则链消息处理的核心逻辑
  • 管理规则节点的执行顺序
  • 处理消息在规则节点间的传递

关键方法:

  1. onQueueToRuleEngineMsg(): 处理规则引擎消息

    void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
        TbMsg msg = envelope.getMsg();
        if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
            onTellNext(msg, true); // 从规则链开始处理
        } else {
            // 从指定的规则节点开始处理
            onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), ...);
        }
    }
    
  2. onTellNext(): 传递消息到下一个节点

    private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
        RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
        RuleNodeCtx targetCtx;
        if (targetId == null) {
            targetCtx = firstNode; // 从第一个节点开始
        } else {
            targetCtx = nodeActors.get(targetId); // 获取指定的节点
        }
        if (targetCtx != null) {
            pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
        }
    }
    
  3. pushMsgToNode(): 推送消息到规则节点

    • 获取规则节点Actor
    • 发送消息到节点Actor
    • 处理节点执行结果

5.3 RuleNodeActor

位置: application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java

功能:

  • 执行单个规则节点的逻辑
  • 处理节点执行结果
  • 传递消息到下一个节点

关键处理:

  1. 节点执行:

    private void process(TbMsg msg) {
        // 获取规则节点配置
        RuleNode ruleNode = getNode();
    
        // 创建节点上下文
        TbContext ctx = new TbNodeContext(actorContext, msg, ruleNode, ...);
    
        // 执行节点逻辑
        ruleNode.onMsg(ctx, msg);
    }
    
  2. 消息传递:

    • 根据节点执行结果(成功/失败)
    • 获取对应的关系Success Relation / Failure Relation
    • 传递到下一个节点

5.4 规则节点实现

位置: rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/

5.4.1 SaveTelemetryNode

功能: 保存遥测数据到数据库

代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveTelemetryNode.java

处理流程:

  1. 解析TbMsg中的遥测数据
  2. 调用DAO保存到数据库
  3. 传递消息到下一个节点

5.4.2 CreateAlarmNode

功能: 创建告警

代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java

处理流程:

  1. 检查是否已存在告警
  2. 创建新的告警记录
  3. 保存到数据库

5.4.3 TbJavaScriptNode

功能: 执行JavaScript脚本

代码位置: rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbJavaScriptNode.java

处理流程:

  1. 构建JavaScript执行请求
  2. 发送到Kafka (tb.js-evaluator.*主题)
  3. 等待tb-js-executor返回结果
  4. 使用执行结果继续规则链处理

6. 数据流向图

┌─────────────────────────────────────────────────────────────────┐
│                      流量入口                                     │
├─────────────────────────────────────────────────────────────────┤
│ Kafka (tb.rule-engine.* 主题)                                   │
│   └─ tb-core 推送的消息                                          │
│       ├─ 设备遥测数据 (TELEMETRY)                                │
│       ├─ 属性更新 (ATTRIBUTES_UPDATED)                           │
│       ├─ 设备事件 (DEVICE_CONNECT/DISCONNECT)                   │
│       └─ RPC响应 (RPC_CALL_FROM_DEVICE)                         │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量处理                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消费                                                     │
│    TbRuleEngineQueueConsumerManager.processMsgs()               │
│                                                                   │
│ 2. 提交策略处理                                                   │
│    ├─ SEQUENTIAL_BY_ORIGINATOR: 按设备顺序                      │
│    ├─ SEQUENTIAL_BY_TENANT: 按租户顺序                          │
│    └─ BURST: 并发处理                                           │
│                                                                   │
│ 3. Actor系统路由                                                 │
│    AppActor → TenantActor → RuleChainActor                      │
│                                                                   │
│ 4. 规则链处理                                                    │
│    RuleChainActorMessageProcessor                                │
│      ├─ 获取规则链配置                                           │
│      ├─ 遍历规则节点                                             │
│      └─ 执行规则节点逻辑                                         │
│                                                                   │
│ 5. 规则节点执行                                                  │
│    ├─ 过滤节点: 判断条件                                        │
│    ├─ 转换节点: 数据转换                                        │
│    ├─ 动作节点: 保存数据、创建告警等                            │
│    ├─ JavaScript节点: 调用tb-js-executor                        │
│    └─ 外部节点: 调用外部系统                                     │
│                                                                   │
│ 6. 消息传递                                                      │
│    ├─ Success关系 → 下一个节点                                  │
│    ├─ Failure关系 → 错误处理节点                                │
│    └─ 其他关系 → 条件分支                                       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量出口                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息                                                     │
│    ├─ tb.core.*: 发送到核心服务 (属性更新、RPC请求)              │
│    └─ tb.transport.*: 发送到传输服务 (推送到设备)                │
│                                                                   │
│ 2. 数据库写入                                                    │
│    ├─ PostgreSQL: 告警、通知等实体数据                           │
│    └─ Cassandra: 遥测数据、属性数据                              │
│                                                                   │
│ 3. 外部系统                                                      │
│    ├─ REST API调用                                               │
│    ├─ MQTT发布                                                   │
│    └─ Kafka发布                                                  │
│                                                                   │
│ 4. JavaScript执行                                                │
│    └─ tb.js-evaluator.*: 发送到tb-js-executor                    │
└─────────────────────────────────────────────────────────────────┘

7. 关键配置

7.1 环境变量配置

  • TB_SERVICE_TYPE=tb-rule-engine: 标识为规则引擎服务
  • ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址
  • TB_QUEUE_TYPE=kafka: 队列类型
  • TB_KAFKA_SERVERS=kafka:9092: Kafka地址

7.2 核心配置项

  • queue.rule-engine.poll-interval: Kafka消息轮询间隔
  • queue.rule-engine.consumer-per-partition: 是否每个分区一个消费者
  • queue.rule-engine.pack-processing-timeout: 消息包处理超时时间
  • actors.rule.engine.queue.max_pending_msgs: 队列最大待处理消息数

7.3 规则链配置

规则链配置存储在数据库中PostgreSQL包含

  • 规则链元数据
  • 规则节点列表
  • 节点间的关系(连接)
  • 节点配置参数

8. 总结

tb-rule-engine 是 ThingsBoard 的规则执行引擎,负责:

  1. 消息消费: 从Kafka消费tb-core推送的消息
  2. 规则链执行: 按照配置的规则链顺序执行规则节点
  3. 节点处理: 支持过滤、转换、动作等多种节点类型
  4. JavaScript执行: 调用tb-js-executor执行JavaScript脚本
  5. 数据持久化: 保存处理结果到数据库
  6. 消息推送: 发送消息到其他服务(核心服务、传输服务等)

整个流程采用了Actor模型消息驱动的架构,支持:

  • 顺序处理: 保证同一设备/租户的消息顺序
  • 并发处理: 最大化吞吐量
  • 灵活配置: 通过规则链配置实现业务逻辑
  • 高可用性: 支持多实例部署