thingsboard/summary/镜像流量分析/01-tb-core流量分析.md

518 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tb-core 流量分析
## 1. 镜像概述
**tb-core** 是 ThingsBoard 的核心服务,负责:
- 设备管理(创建设备、更新设备信息等)
- 用户和租户管理
- REST API 提供
- 设备会话管理
- 消息路由到规则引擎
- 与数据库交互PostgreSQL/Cassandra
- 缓存管理Valkey
**镜像名称**: `thingsboard/tb-node`
**服务类型**: 通过环境变量 `TB_SERVICE_TYPE=tb-core` 区分
**源码位置**: `application/` 目录
---
## 2. 流量入口
### 2.1 HTTP REST API 入口
**入口点**:
- `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java` - Spring Boot 主类
- REST Controller: `application/src/main/java/org/thingsboard/server/controller/`
**端口**: `8080`
**流量来源**:
- **tb-web-ui**: Web前端通过HTTP REST API调用
- **外部API客户端**: 第三方系统通过REST API集成
- **HAProxy**: 负载均衡器转发HTTP请求
**关键接口**:
- `/api/device` - 设备管理API
- `/api/tenant` - 租户管理API
- `/api/user` - 用户管理API
- `/api/telemetry` - 遥测数据API
- `/api/rpc` - RPC调用API
### 2.2 Kafka 消息入口
**入口点**:
- `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
**Kafka主题**: `tb.core.*`
**流量来源**:
- **传输服务** (tb-mqtt-transport, tb-http-transport等): 发送设备数据到核心服务
- **规则引擎** (tb-rule-engine): 返回规则链处理结果
- **tb-vc-executor**: 版本控制相关消息
**消息类型**:
- `ToCoreMsg`: 传输服务发送的消息
- `ToCoreNotificationMsg`: 通知消息
- `TransportToDeviceActorMsg`: 设备Actor消息
### 2.3 WebSocket 入口
**入口点**:
- `application/src/main/java/org/thingsboard/server/controller/WebSocketController.java`
**端口**: `8080` (与HTTP同一端口)
**流量来源**:
- **tb-web-ui**: Web前端通过WebSocket接收实时数据推送
---
## 3. 流量处理流程
### 3.1 REST API 请求处理流程
```
用户请求 (HTTP)
HAProxy (可选)
Spring MVC DispatcherServlet
@RestController (例如: DeviceController)
Service层 (例如: DeviceService)
DAO层 (例如: DeviceDao)
PostgreSQL/Cassandra (数据库操作)
返回响应 (HTTP Response)
```
**核心代码模块**:
1. **ThingsboardServerApplication** (`application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java`)
- Spring Boot主类
- 启动整个应用
- 组件扫描: `org.thingsboard.server`
2. **Controller层** (`application/src/main/java/org/thingsboard/server/controller/`)
- `DeviceController.java`: 设备管理API
- `TenantController.java`: 租户管理API
- `UserController.java`: 用户管理API
- `WebSocketController.java`: WebSocket连接管理
3. **Service层** (`application/src/main/java/org/thingsboard/server/service/`)
- `DeviceService`: 设备业务逻辑
- `TenantService`: 租户业务逻辑
- `UserService`: 用户业务逻辑
4. **DAO层** (`dao/src/main/java/org/thingsboard/server/dao/`)
- `DeviceDao`: 设备数据访问
- `TenantDao`: 租户数据访问
- `UserDao`: 用户数据访问
### 3.2 Kafka 消息处理流程
```
Kafka (tb.core.* 主题)
DefaultTbCoreConsumerService.processMsgs()
消息类型判断
├─ ToDeviceActorMsg → DeviceActor (Actor系统)
├─ ToSubscriptionMgrMsg → SubscriptionManagerService
├─ DeviceStateServiceMsg → DeviceStateService
└─ 其他消息类型...
Actor系统处理
├─ TenantActor (租户级别Actor)
│ └─ DeviceActor (设备级别Actor)
└─ RuleChainActor (规则链Actor)
业务处理
├─ 设备状态更新
├─ 属性更新
├─ 遥测数据存储
└─ 推送到规则引擎
Kafka (tb.rule-engine.* 主题) → 规则引擎
```
**核心代码模块**:
1. **DefaultTbCoreConsumerService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`)
- **功能**: 消费Kafka消息并分发到不同的处理服务
- **关键方法**:
- `processMsgs()`: 批量处理消息
- `forwardToDeviceActor()`: 转发消息到DeviceActor
- `forwardToSubMgrService()`: 转发消息到订阅管理器
- `forwardToStateService()`: 转发消息到状态服务
2. **Actor系统** (`application/src/main/java/org/thingsboard/server/actors/`)
- **AppActor**: 应用级别Actor管理所有TenantActor
- **TenantActor**: 租户级别Actor管理租户下的设备和规则链
- **DeviceActor**: 设备级别Actor处理单个设备的消息
- **RuleChainActor**: 规则链Actor管理规则链执行
3. **DeviceActor** (`application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`)
- **功能**: 处理设备相关的所有消息
- **处理消息类型**:
- `TransportToDeviceActorMsg`: 传输层消息遥测、属性、RPC响应
- `DeviceAttributesUpdateMsg`: 设备属性更新
- `DeviceRpcRequestMsg`: 设备RPC请求
4. **TenantActor** (`application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`)
- **功能**: 租户级别消息路由
- **关键方法**:
- `onToDeviceActorMsg()`: 路由消息到DeviceActor
- `onQueueToRuleEngineMsg()`: 推送消息到规则引擎
### 3.3 设备数据上报流程(从传输服务)
```
设备 → tb-mqtt-transport (MQTT消息)
MqttTransportHandler.processDevicePublish()
DefaultTransportService.process() (PostTelemetryMsg/PostAttributeMsg)
Kafka (ToCoreMsg) → tb.core.* 主题
tb-core: DefaultTbCoreConsumerService.processMsgs()
forwardToDeviceActor() → DeviceActor
DeviceActor.process() → 处理遥测/属性数据
推送消息到规则引擎: pushMsgToRuleEngine()
Kafka (ToRuleEngineMsg) → tb.rule-engine.* 主题
tb-rule-engine 处理
```
**核心代码模块**:
1. **DefaultTransportService** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`)
- **功能**: 传输服务统一接口,处理设备上报数据
- **关键方法**:
- `process(PostTelemetryMsg)`: 处理遥测数据
- `process(PostAttributeMsg)`: 处理属性数据
- `sendToCore()`: 发送消息到核心服务通过Kafka
2. **DefaultTbClusterService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`)
- **功能**: 集群服务,负责消息路由和推送到规则引擎
- **关键方法**:
- `pushMsgToRuleEngine()`: 推送消息到规则引擎队列
### 3.4 规则引擎消息返回流程
```
tb-rule-engine 处理完成
Kafka (ToCoreMsg) → tb.core.* 主题
tb-core: DefaultTbCoreConsumerService.processMsgs()
forwardToDeviceActor() → DeviceActor
DeviceActor: 处理规则引擎返回的消息
├─ 属性更新 → 通知传输服务
├─ RPC请求 → 发送到设备
└─ 告警 → 保存到数据库
Kafka (ToTransportMsg) → tb.transport.* 主题 (如果需要推送到设备)
```
---
## 4. 流量出口
### 4.1 HTTP响应出口
**出口点**: REST Controller返回响应
**流量去向**:
- **tb-web-ui**: Web前端接收API响应
- **外部API客户端**: 第三方系统接收响应
### 4.2 Kafka消息出口
**出口点**:
- `DefaultTbClusterService.pushMsgToRuleEngine()`: 推送到规则引擎
- `DefaultTbCoreToTransportService.process()`: 推送到传输服务
**Kafka主题**:
- `tb.rule-engine.*`: 发送到规则引擎
- `tb.transport.*`: 发送到传输服务
**流量去向**:
- **tb-rule-engine**: 规则链处理
- **传输服务**: 推送到设备RPC请求、属性更新等
### 4.3 数据库写入出口
**出口点**: DAO层执行数据库操作
**流量去向**:
- **PostgreSQL**: 实体数据(设备、用户、租户等)
- **Cassandra**: 时序数据(遥测数据,混合模式)
### 4.4 缓存写入出口
**出口点**: Cache服务
**流量去向**:
- **Valkey**: 设备属性缓存、会话信息缓存
---
## 5. 核心代码模块详解
### 5.1 ThingsboardServerApplication
**位置**: `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java`
**功能**:
- Spring Boot应用入口
- 组件扫描配置
- 应用启动完成后记录启动时间
**关键代码**:
```java
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server", "org.thingsboard.script"})
public class ThingsboardServerApplication {
public static void main(String[] args) {
SpringApplication.run(ThingsboardServerApplication.class, updateArguments(args));
}
}
```
### 5.2 DefaultTbCoreConsumerService
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
**功能**:
- 消费Kafka中的`ToCoreMsg`消息
- 根据消息类型分发到不同的处理服务
- 管理消息处理的生命周期
**关键方法**:
1. **processMsgs()**: 批量处理消息
```java
private void processMsgs(List<TbProtoQueueMsg<ToCoreMsg>> msgs, ...) {
// 根据消息类型分发
if (toCoreMsg.hasToDeviceActorMsg()) {
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasToSubscriptionMgrMsg()) {
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
}
// ...
}
```
2. **forwardToDeviceActor()**: 转发消息到DeviceActor
- 通过Actor系统将消息路由到对应的设备Actor
- 使用租户ID和设备ID进行路由
3. **forwardToStateService()**: 转发消息到状态服务
- 处理设备连接、断开、不活动等状态事件
### 5.3 Actor系统
**位置**: `application/src/main/java/org/thingsboard/server/actors/`
**架构**:
```
AppActor (应用级别)
└─ TenantActor (租户级别)
├─ DeviceActor (设备级别)
└─ RuleChainActor (规则链级别)
```
**AppActor** (`actors/app/AppActor.java`):
- 管理所有TenantActor
- 处理分区变更、组件生命周期等系统级消息
- 路由消息到对应的TenantActor
**TenantActor** (`actors/tenant/TenantActor.java`):
- 管理租户下的设备和规则链
- 路由消息到DeviceActor或RuleChainActor
- 处理租户级别的订阅管理
**DeviceActor** (`actors/device/DeviceActor.java`):
- 处理单个设备的所有消息
- 管理设备状态、属性、遥测数据
- 处理设备RPC请求和响应
### 5.4 DeviceActor
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`
**功能**:
- 处理设备的遥测数据、属性更新
- 管理设备会话状态
- 处理设备RPC请求
- 推送消息到规则引擎
**关键处理流程**:
1. **接收遥测数据** (`onPostTelemetry()`):
- 存储到数据库PostgreSQL/Cassandra
- 构建TbMsg消息
- 推送到规则引擎
2. **接收属性更新** (`onPostAttributes()`):
- 更新设备属性
- 触发属性更新订阅
- 推送到规则引擎
3. **推送到规则引擎** (`pushToRuleEngine()`):
- 构建ToRuleEngineMsg
- 发送到Kafka的tb.rule-engine.*主题
### 5.5 DefaultTbClusterService
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`
**功能**:
- 集群级别的消息路由
- 推送到规则引擎
- 推送到核心服务(跨节点)
**关键方法**:
1. **pushMsgToRuleEngine()**: 推送消息到规则引擎
```java
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
// 解析规则引擎配置
// 发送到Kafka的tb.rule-engine.*主题
ruleEngineProducerService.sendToRuleEngine(...);
}
```
### 5.6 DefaultTbCoreToTransportService
**位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java`
**功能**:
- 核心服务向传输服务发送消息
- 发送RPC请求、属性更新通知等到设备
**关键方法**:
1. **process()**: 发送消息到传输服务
```java
public void process(String nodeId, ToTransportMsg msg, ...) {
// 获取传输服务节点主题
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
// 发送到Kafka
tbTransportProducer.send(tpi, queueMsg, callback);
}
```
---
## 6. 数据流向图
```
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP REST API (端口8080) │
│ ├─ tb-web-ui (前端请求) │
│ ├─ HAProxy (负载均衡) │
│ └─ 外部API客户端 │
│ │
│ 2. Kafka (tb.core.* 主题) │
│ ├─ tb-mqtt-transport (设备数据) │
│ ├─ tb-http-transport (设备数据) │
│ ├─ tb-rule-engine (规则引擎结果) │
│ └─ tb-vc-executor (版本控制) │
│ │
│ 3. WebSocket (端口8080) │
│ └─ tb-web-ui (实时数据推送) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. REST API 处理 │
│ Controller → Service → DAO → 数据库 │
│ │
│ 2. Kafka 消息处理 │
│ DefaultTbCoreConsumerService │
│ ├─ forwardToDeviceActor() → DeviceActor │
│ ├─ forwardToSubMgrService() → SubscriptionManager │
│ └─ forwardToStateService() → DeviceStateService │
│ │
│ 3. Actor 系统处理 │
│ AppActor → TenantActor → DeviceActor │
│ ├─ 设备状态管理 │
│ ├─ 属性更新 │
│ ├─ 遥测数据存储 │
│ └─ 推送消息到规则引擎 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP 响应 │
│ → tb-web-ui / 外部API客户端 │
│ │
│ 2. Kafka 消息 │
│ → tb.rule-engine.* (规则引擎) │
│ → tb.transport.* (传输服务) │
│ │
│ 3. 数据库写入 │
│ → PostgreSQL (实体数据) │
│ → Cassandra (时序数据,混合模式) │
│ │
│ 4. 缓存写入 │
│ → Valkey (属性缓存、会话缓存) │
└─────────────────────────────────────────────────────────────────┘
```
---
## 7. 关键配置
### 7.1 环境变量配置
- `TB_SERVICE_TYPE=tb-core`: 标识为核心服务
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
- `TB_QUEUE_TYPE=kafka`: 队列类型
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
- `SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/thingsboard`: 数据库地址
### 7.2 核心配置项
- `queue.core.poll-interval`: Kafka消息轮询间隔
- `queue.core.consumer-per-partition`: 是否每个分区一个消费者
- `actors.rule.engine.queue.max_pending_msgs`: 规则引擎队列最大待处理消息数
---
## 8. 总结
**tb-core** 是 ThingsBoard 的核心枢纽,负责:
1. **入口多样性**: HTTP REST API、Kafka消息、WebSocket
2. **消息路由**: 通过Actor系统将消息路由到正确的处理者
3. **业务处理**: 设备管理、用户管理、数据存储
4. **消息推送**: 推送到规则引擎进行规则链处理
5. **数据持久化**: 与PostgreSQL和Cassandra交互
6. **缓存管理**: 使用Valkey缓存设备属性
整个流程采用了**Actor模型**实现消息驱动的异步处理,通过**Kafka**实现服务间的解耦通信,保证了系统的高可用性和可扩展性。