thingsboard/summary/镜像流量分析/01-tb-core流量分析.md

19 KiB
Raw Permalink Blame History

tb-core 流量分析

1. 镜像概述

tb-core 是 ThingsBoard 的核心服务,负责:

  • 设备管理(创建设备、更新设备信息等)
  • 用户和租户管理
  • REST API 提供
  • 设备会话管理
  • 消息路由到规则引擎
  • 与数据库交互PostgreSQL/Cassandra
  • 缓存管理Valkey

镜像名称: thingsboard/tb-node
服务类型: 通过环境变量 TB_SERVICE_TYPE=tb-core 区分
源码位置: application/ 目录


2. 流量入口

2.1 HTTP REST API 入口

入口点:

  • application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java - Spring Boot 主类
  • REST Controller: application/src/main/java/org/thingsboard/server/controller/

端口: 8080

流量来源:

  • tb-web-ui: Web前端通过HTTP REST API调用
  • 外部API客户端: 第三方系统通过REST API集成
  • HAProxy: 负载均衡器转发HTTP请求

关键接口:

  • /api/device - 设备管理API
  • /api/tenant - 租户管理API
  • /api/user - 用户管理API
  • /api/telemetry - 遥测数据API
  • /api/rpc - RPC调用API

2.2 Kafka 消息入口

入口点:

  • application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

Kafka主题: tb.core.*

流量来源:

  • 传输服务 (tb-mqtt-transport, tb-http-transport等): 发送设备数据到核心服务
  • 规则引擎 (tb-rule-engine): 返回规则链处理结果
  • tb-vc-executor: 版本控制相关消息

消息类型:

  • ToCoreMsg: 传输服务发送的消息
  • ToCoreNotificationMsg: 通知消息
  • TransportToDeviceActorMsg: 设备Actor消息

2.3 WebSocket 入口

入口点:

  • application/src/main/java/org/thingsboard/server/controller/WebSocketController.java

端口: 8080 (与HTTP同一端口)

流量来源:

  • tb-web-ui: Web前端通过WebSocket接收实时数据推送

3. 流量处理流程

3.1 REST API 请求处理流程

用户请求 (HTTP)
  ↓
HAProxy (可选)
  ↓
Spring MVC DispatcherServlet
  ↓
@RestController (例如: DeviceController)
  ↓
Service层 (例如: DeviceService)
  ↓
DAO层 (例如: DeviceDao)
  ↓
PostgreSQL/Cassandra (数据库操作)
  ↓
返回响应 (HTTP Response)

核心代码模块:

  1. ThingsboardServerApplication (application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java)

    • Spring Boot主类
    • 启动整个应用
    • 组件扫描: org.thingsboard.server
  2. Controller层 (application/src/main/java/org/thingsboard/server/controller/)

    • DeviceController.java: 设备管理API
    • TenantController.java: 租户管理API
    • UserController.java: 用户管理API
    • WebSocketController.java: WebSocket连接管理
  3. Service层 (application/src/main/java/org/thingsboard/server/service/)

    • DeviceService: 设备业务逻辑
    • TenantService: 租户业务逻辑
    • UserService: 用户业务逻辑
  4. DAO层 (dao/src/main/java/org/thingsboard/server/dao/)

    • DeviceDao: 设备数据访问
    • TenantDao: 租户数据访问
    • UserDao: 用户数据访问

3.2 Kafka 消息处理流程

Kafka (tb.core.* 主题)
  ↓
DefaultTbCoreConsumerService.processMsgs()
  ↓
消息类型判断
  ├─ ToDeviceActorMsg → DeviceActor (Actor系统)
  ├─ ToSubscriptionMgrMsg → SubscriptionManagerService
  ├─ DeviceStateServiceMsg → DeviceStateService
  └─ 其他消息类型...
  ↓
Actor系统处理
  ├─ TenantActor (租户级别Actor)
  │   └─ DeviceActor (设备级别Actor)
  └─ RuleChainActor (规则链Actor)
  ↓
业务处理
  ├─ 设备状态更新
  ├─ 属性更新
  ├─ 遥测数据存储
  └─ 推送到规则引擎
  ↓
Kafka (tb.rule-engine.* 主题) → 规则引擎

核心代码模块:

  1. DefaultTbCoreConsumerService (application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java)

    • 功能: 消费Kafka消息并分发到不同的处理服务
    • 关键方法:
      • processMsgs(): 批量处理消息
      • forwardToDeviceActor(): 转发消息到DeviceActor
      • forwardToSubMgrService(): 转发消息到订阅管理器
      • forwardToStateService(): 转发消息到状态服务
  2. Actor系统 (application/src/main/java/org/thingsboard/server/actors/)

    • AppActor: 应用级别Actor管理所有TenantActor
    • TenantActor: 租户级别Actor管理租户下的设备和规则链
    • DeviceActor: 设备级别Actor处理单个设备的消息
    • RuleChainActor: 规则链Actor管理规则链执行
  3. DeviceActor (application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java)

    • 功能: 处理设备相关的所有消息
    • 处理消息类型:
      • TransportToDeviceActorMsg: 传输层消息遥测、属性、RPC响应
      • DeviceAttributesUpdateMsg: 设备属性更新
      • DeviceRpcRequestMsg: 设备RPC请求
  4. TenantActor (application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java)

    • 功能: 租户级别消息路由
    • 关键方法:
      • onToDeviceActorMsg(): 路由消息到DeviceActor
      • onQueueToRuleEngineMsg(): 推送消息到规则引擎

3.3 设备数据上报流程(从传输服务)

设备 → tb-mqtt-transport (MQTT消息)
  ↓
MqttTransportHandler.processDevicePublish()
  ↓
DefaultTransportService.process() (PostTelemetryMsg/PostAttributeMsg)
  ↓
Kafka (ToCoreMsg) → tb.core.* 主题
  ↓
tb-core: DefaultTbCoreConsumerService.processMsgs()
  ↓
forwardToDeviceActor() → DeviceActor
  ↓
DeviceActor.process() → 处理遥测/属性数据
  ↓
推送消息到规则引擎: pushMsgToRuleEngine()
  ↓
Kafka (ToRuleEngineMsg) → tb.rule-engine.* 主题
  ↓
tb-rule-engine 处理

核心代码模块:

  1. DefaultTransportService (common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java)

    • 功能: 传输服务统一接口,处理设备上报数据
    • 关键方法:
      • process(PostTelemetryMsg): 处理遥测数据
      • process(PostAttributeMsg): 处理属性数据
      • sendToCore(): 发送消息到核心服务通过Kafka
  2. DefaultTbClusterService (application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java)

    • 功能: 集群服务,负责消息路由和推送到规则引擎
    • 关键方法:
      • pushMsgToRuleEngine(): 推送消息到规则引擎队列

3.4 规则引擎消息返回流程

tb-rule-engine 处理完成
  ↓
Kafka (ToCoreMsg) → tb.core.* 主题
  ↓
tb-core: DefaultTbCoreConsumerService.processMsgs()
  ↓
forwardToDeviceActor() → DeviceActor
  ↓
DeviceActor: 处理规则引擎返回的消息
  ├─ 属性更新 → 通知传输服务
  ├─ RPC请求 → 发送到设备
  └─ 告警 → 保存到数据库
  ↓
Kafka (ToTransportMsg) → tb.transport.* 主题 (如果需要推送到设备)

4. 流量出口

4.1 HTTP响应出口

出口点: REST Controller返回响应

流量去向:

  • tb-web-ui: Web前端接收API响应
  • 外部API客户端: 第三方系统接收响应

4.2 Kafka消息出口

出口点:

  • DefaultTbClusterService.pushMsgToRuleEngine(): 推送到规则引擎
  • DefaultTbCoreToTransportService.process(): 推送到传输服务

Kafka主题:

  • tb.rule-engine.*: 发送到规则引擎
  • tb.transport.*: 发送到传输服务

流量去向:

  • tb-rule-engine: 规则链处理
  • 传输服务: 推送到设备RPC请求、属性更新等

4.3 数据库写入出口

出口点: DAO层执行数据库操作

流量去向:

  • PostgreSQL: 实体数据(设备、用户、租户等)
  • Cassandra: 时序数据(遥测数据,混合模式)

4.4 缓存写入出口

出口点: Cache服务

流量去向:

  • Valkey: 设备属性缓存、会话信息缓存

5. 核心代码模块详解

5.1 ThingsboardServerApplication

位置: application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java

功能:

  • Spring Boot应用入口
  • 组件扫描配置
  • 应用启动完成后记录启动时间

关键代码:

@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server", "org.thingsboard.script"})
public class ThingsboardServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ThingsboardServerApplication.class, updateArguments(args));
    }
}

5.2 DefaultTbCoreConsumerService

位置: application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

功能:

  • 消费Kafka中的ToCoreMsg消息
  • 根据消息类型分发到不同的处理服务
  • 管理消息处理的生命周期

关键方法:

  1. processMsgs(): 批量处理消息

    private void processMsgs(List<TbProtoQueueMsg<ToCoreMsg>> msgs, ...) {
        // 根据消息类型分发
        if (toCoreMsg.hasToDeviceActorMsg()) {
            forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
        } else if (toCoreMsg.hasToSubscriptionMgrMsg()) {
            forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
        }
        // ...
    }
    
  2. forwardToDeviceActor(): 转发消息到DeviceActor

    • 通过Actor系统将消息路由到对应的设备Actor
    • 使用租户ID和设备ID进行路由
  3. forwardToStateService(): 转发消息到状态服务

    • 处理设备连接、断开、不活动等状态事件

5.3 Actor系统

位置: application/src/main/java/org/thingsboard/server/actors/

架构:

AppActor (应用级别)
  └─ TenantActor (租户级别)
      ├─ DeviceActor (设备级别)
      └─ RuleChainActor (规则链级别)

AppActor (actors/app/AppActor.java):

  • 管理所有TenantActor
  • 处理分区变更、组件生命周期等系统级消息
  • 路由消息到对应的TenantActor

TenantActor (actors/tenant/TenantActor.java):

  • 管理租户下的设备和规则链
  • 路由消息到DeviceActor或RuleChainActor
  • 处理租户级别的订阅管理

DeviceActor (actors/device/DeviceActor.java):

  • 处理单个设备的所有消息
  • 管理设备状态、属性、遥测数据
  • 处理设备RPC请求和响应

5.4 DeviceActor

位置: application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java

功能:

  • 处理设备的遥测数据、属性更新
  • 管理设备会话状态
  • 处理设备RPC请求
  • 推送消息到规则引擎

关键处理流程:

  1. 接收遥测数据 (onPostTelemetry()):

    • 存储到数据库PostgreSQL/Cassandra
    • 构建TbMsg消息
    • 推送到规则引擎
  2. 接收属性更新 (onPostAttributes()):

    • 更新设备属性
    • 触发属性更新订阅
    • 推送到规则引擎
  3. 推送到规则引擎 (pushToRuleEngine()):

    • 构建ToRuleEngineMsg
    • 发送到Kafka的tb.rule-engine.*主题

5.5 DefaultTbClusterService

位置: application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

功能:

  • 集群级别的消息路由
  • 推送到规则引擎
  • 推送到核心服务(跨节点)

关键方法:

  1. pushMsgToRuleEngine(): 推送消息到规则引擎
    public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
        // 解析规则引擎配置
        // 发送到Kafka的tb.rule-engine.*主题
        ruleEngineProducerService.sendToRuleEngine(...);
    }
    

5.6 DefaultTbCoreToTransportService

位置: application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java

功能:

  • 核心服务向传输服务发送消息
  • 发送RPC请求、属性更新通知等到设备

关键方法:

  1. process(): 发送消息到传输服务
    public void process(String nodeId, ToTransportMsg msg, ...) {
        // 获取传输服务节点主题
        TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
        // 发送到Kafka
        tbTransportProducer.send(tpi, queueMsg, callback);
    }
    

6. 数据流向图

┌─────────────────────────────────────────────────────────────────┐
│                      流量入口                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP REST API (端口8080)                                      │
│    ├─ tb-web-ui (前端请求)                                        │
│    ├─ HAProxy (负载均衡)                                         │
│    └─ 外部API客户端                                                │
│                                                                   │
│ 2. Kafka (tb.core.* 主题)                                        │
│    ├─ tb-mqtt-transport (设备数据)                                │
│    ├─ tb-http-transport (设备数据)                                │
│    ├─ tb-rule-engine (规则引擎结果)                               │
│    └─ tb-vc-executor (版本控制)                                   │
│                                                                   │
│ 3. WebSocket (端口8080)                                          │
│    └─ tb-web-ui (实时数据推送)                                    │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量处理                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. REST API 处理                                                 │
│    Controller → Service → DAO → 数据库                           │
│                                                                   │
│ 2. Kafka 消息处理                                                │
│    DefaultTbCoreConsumerService                                 │
│      ├─ forwardToDeviceActor() → DeviceActor                    │
│      ├─ forwardToSubMgrService() → SubscriptionManager         │
│      └─ forwardToStateService() → DeviceStateService            │
│                                                                   │
│ 3. Actor 系统处理                                                │
│    AppActor → TenantActor → DeviceActor                         │
│      ├─ 设备状态管理                                              │
│      ├─ 属性更新                                                  │
│      ├─ 遥测数据存储                                              │
│      └─ 推送消息到规则引擎                                        │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量出口                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP 响应                                                     │
│    → tb-web-ui / 外部API客户端                                    │
│                                                                   │
│ 2. Kafka 消息                                                     │
│    → tb.rule-engine.* (规则引擎)                                 │
│    → tb.transport.* (传输服务)                                   │
│                                                                   │
│ 3. 数据库写入                                                     │
│    → PostgreSQL (实体数据)                                       │
│    → Cassandra (时序数据,混合模式)                              │
│                                                                   │
│ 4. 缓存写入                                                      │
│    → Valkey (属性缓存、会话缓存)                                 │
└─────────────────────────────────────────────────────────────────┘

7. 关键配置

7.1 环境变量配置

  • TB_SERVICE_TYPE=tb-core: 标识为核心服务
  • ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址
  • TB_QUEUE_TYPE=kafka: 队列类型
  • TB_KAFKA_SERVERS=kafka:9092: Kafka地址
  • SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/thingsboard: 数据库地址

7.2 核心配置项

  • queue.core.poll-interval: Kafka消息轮询间隔
  • queue.core.consumer-per-partition: 是否每个分区一个消费者
  • actors.rule.engine.queue.max_pending_msgs: 规则引擎队列最大待处理消息数

8. 总结

tb-core 是 ThingsBoard 的核心枢纽,负责:

  1. 入口多样性: HTTP REST API、Kafka消息、WebSocket
  2. 消息路由: 通过Actor系统将消息路由到正确的处理者
  3. 业务处理: 设备管理、用户管理、数据存储
  4. 消息推送: 推送到规则引擎进行规则链处理
  5. 数据持久化: 与PostgreSQL和Cassandra交互
  6. 缓存管理: 使用Valkey缓存设备属性

整个流程采用了Actor模型实现消息驱动的异步处理,通过Kafka实现服务间的解耦通信,保证了系统的高可用性和可扩展性。