518 lines
19 KiB
Markdown
518 lines
19 KiB
Markdown
# tb-core 流量分析
|
||
|
||
## 1. 镜像概述
|
||
|
||
**tb-core** 是 ThingsBoard 的核心服务,负责:
|
||
- 设备管理(创建设备、更新设备信息等)
|
||
- 用户和租户管理
|
||
- REST API 提供
|
||
- 设备会话管理
|
||
- 消息路由到规则引擎
|
||
- 与数据库交互(PostgreSQL/Cassandra)
|
||
- 缓存管理(Valkey)
|
||
|
||
**镜像名称**: `thingsboard/tb-node`
|
||
**服务类型**: 通过环境变量 `TB_SERVICE_TYPE=tb-core` 区分
|
||
**源码位置**: `application/` 目录
|
||
|
||
---
|
||
|
||
## 2. 流量入口
|
||
|
||
### 2.1 HTTP REST API 入口
|
||
|
||
**入口点**:
|
||
- `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java` - Spring Boot 主类
|
||
- REST Controller: `application/src/main/java/org/thingsboard/server/controller/`
|
||
|
||
**端口**: `8080`
|
||
|
||
**流量来源**:
|
||
- **tb-web-ui**: Web前端通过HTTP REST API调用
|
||
- **外部API客户端**: 第三方系统通过REST API集成
|
||
- **HAProxy**: 负载均衡器转发HTTP请求
|
||
|
||
**关键接口**:
|
||
- `/api/device` - 设备管理API
|
||
- `/api/tenant` - 租户管理API
|
||
- `/api/user` - 用户管理API
|
||
- `/api/telemetry` - 遥测数据API
|
||
- `/api/rpc` - RPC调用API
|
||
|
||
### 2.2 Kafka 消息入口
|
||
|
||
**入口点**:
|
||
- `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
|
||
|
||
**Kafka主题**: `tb.core.*`
|
||
|
||
**流量来源**:
|
||
- **传输服务** (tb-mqtt-transport, tb-http-transport等): 发送设备数据到核心服务
|
||
- **规则引擎** (tb-rule-engine): 返回规则链处理结果
|
||
- **tb-vc-executor**: 版本控制相关消息
|
||
|
||
**消息类型**:
|
||
- `ToCoreMsg`: 传输服务发送的消息
|
||
- `ToCoreNotificationMsg`: 通知消息
|
||
- `TransportToDeviceActorMsg`: 设备Actor消息
|
||
|
||
### 2.3 WebSocket 入口
|
||
|
||
**入口点**:
|
||
- `application/src/main/java/org/thingsboard/server/controller/WebSocketController.java`
|
||
|
||
**端口**: `8080` (与HTTP同一端口)
|
||
|
||
**流量来源**:
|
||
- **tb-web-ui**: Web前端通过WebSocket接收实时数据推送
|
||
|
||
---
|
||
|
||
## 3. 流量处理流程
|
||
|
||
### 3.1 REST API 请求处理流程
|
||
|
||
```
|
||
用户请求 (HTTP)
|
||
↓
|
||
HAProxy (可选)
|
||
↓
|
||
Spring MVC DispatcherServlet
|
||
↓
|
||
@RestController (例如: DeviceController)
|
||
↓
|
||
Service层 (例如: DeviceService)
|
||
↓
|
||
DAO层 (例如: DeviceDao)
|
||
↓
|
||
PostgreSQL/Cassandra (数据库操作)
|
||
↓
|
||
返回响应 (HTTP Response)
|
||
```
|
||
|
||
**核心代码模块**:
|
||
|
||
1. **ThingsboardServerApplication** (`application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java`)
|
||
- Spring Boot主类
|
||
- 启动整个应用
|
||
- 组件扫描: `org.thingsboard.server`
|
||
|
||
2. **Controller层** (`application/src/main/java/org/thingsboard/server/controller/`)
|
||
- `DeviceController.java`: 设备管理API
|
||
- `TenantController.java`: 租户管理API
|
||
- `UserController.java`: 用户管理API
|
||
- `WebSocketController.java`: WebSocket连接管理
|
||
|
||
3. **Service层** (`application/src/main/java/org/thingsboard/server/service/`)
|
||
- `DeviceService`: 设备业务逻辑
|
||
- `TenantService`: 租户业务逻辑
|
||
- `UserService`: 用户业务逻辑
|
||
|
||
4. **DAO层** (`dao/src/main/java/org/thingsboard/server/dao/`)
|
||
- `DeviceDao`: 设备数据访问
|
||
- `TenantDao`: 租户数据访问
|
||
- `UserDao`: 用户数据访问
|
||
|
||
### 3.2 Kafka 消息处理流程
|
||
|
||
```
|
||
Kafka (tb.core.* 主题)
|
||
↓
|
||
DefaultTbCoreConsumerService.processMsgs()
|
||
↓
|
||
消息类型判断
|
||
├─ ToDeviceActorMsg → DeviceActor (Actor系统)
|
||
├─ ToSubscriptionMgrMsg → SubscriptionManagerService
|
||
├─ DeviceStateServiceMsg → DeviceStateService
|
||
└─ 其他消息类型...
|
||
↓
|
||
Actor系统处理
|
||
├─ TenantActor (租户级别Actor)
|
||
│ └─ DeviceActor (设备级别Actor)
|
||
└─ RuleChainActor (规则链Actor)
|
||
↓
|
||
业务处理
|
||
├─ 设备状态更新
|
||
├─ 属性更新
|
||
├─ 遥测数据存储
|
||
└─ 推送到规则引擎
|
||
↓
|
||
Kafka (tb.rule-engine.* 主题) → 规则引擎
|
||
```
|
||
|
||
**核心代码模块**:
|
||
|
||
1. **DefaultTbCoreConsumerService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`)
|
||
- **功能**: 消费Kafka消息并分发到不同的处理服务
|
||
- **关键方法**:
|
||
- `processMsgs()`: 批量处理消息
|
||
- `forwardToDeviceActor()`: 转发消息到DeviceActor
|
||
- `forwardToSubMgrService()`: 转发消息到订阅管理器
|
||
- `forwardToStateService()`: 转发消息到状态服务
|
||
|
||
2. **Actor系统** (`application/src/main/java/org/thingsboard/server/actors/`)
|
||
- **AppActor**: 应用级别Actor,管理所有TenantActor
|
||
- **TenantActor**: 租户级别Actor,管理租户下的设备和规则链
|
||
- **DeviceActor**: 设备级别Actor,处理单个设备的消息
|
||
- **RuleChainActor**: 规则链Actor,管理规则链执行
|
||
|
||
3. **DeviceActor** (`application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`)
|
||
- **功能**: 处理设备相关的所有消息
|
||
- **处理消息类型**:
|
||
- `TransportToDeviceActorMsg`: 传输层消息(遥测、属性、RPC响应)
|
||
- `DeviceAttributesUpdateMsg`: 设备属性更新
|
||
- `DeviceRpcRequestMsg`: 设备RPC请求
|
||
|
||
4. **TenantActor** (`application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`)
|
||
- **功能**: 租户级别消息路由
|
||
- **关键方法**:
|
||
- `onToDeviceActorMsg()`: 路由消息到DeviceActor
|
||
- `onQueueToRuleEngineMsg()`: 推送消息到规则引擎
|
||
|
||
### 3.3 设备数据上报流程(从传输服务)
|
||
|
||
```
|
||
设备 → tb-mqtt-transport (MQTT消息)
|
||
↓
|
||
MqttTransportHandler.processDevicePublish()
|
||
↓
|
||
DefaultTransportService.process() (PostTelemetryMsg/PostAttributeMsg)
|
||
↓
|
||
Kafka (ToCoreMsg) → tb.core.* 主题
|
||
↓
|
||
tb-core: DefaultTbCoreConsumerService.processMsgs()
|
||
↓
|
||
forwardToDeviceActor() → DeviceActor
|
||
↓
|
||
DeviceActor.process() → 处理遥测/属性数据
|
||
↓
|
||
推送消息到规则引擎: pushMsgToRuleEngine()
|
||
↓
|
||
Kafka (ToRuleEngineMsg) → tb.rule-engine.* 主题
|
||
↓
|
||
tb-rule-engine 处理
|
||
```
|
||
|
||
**核心代码模块**:
|
||
|
||
1. **DefaultTransportService** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`)
|
||
- **功能**: 传输服务统一接口,处理设备上报数据
|
||
- **关键方法**:
|
||
- `process(PostTelemetryMsg)`: 处理遥测数据
|
||
- `process(PostAttributeMsg)`: 处理属性数据
|
||
- `sendToCore()`: 发送消息到核心服务(通过Kafka)
|
||
|
||
2. **DefaultTbClusterService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`)
|
||
- **功能**: 集群服务,负责消息路由和推送到规则引擎
|
||
- **关键方法**:
|
||
- `pushMsgToRuleEngine()`: 推送消息到规则引擎队列
|
||
|
||
### 3.4 规则引擎消息返回流程
|
||
|
||
```
|
||
tb-rule-engine 处理完成
|
||
↓
|
||
Kafka (ToCoreMsg) → tb.core.* 主题
|
||
↓
|
||
tb-core: DefaultTbCoreConsumerService.processMsgs()
|
||
↓
|
||
forwardToDeviceActor() → DeviceActor
|
||
↓
|
||
DeviceActor: 处理规则引擎返回的消息
|
||
├─ 属性更新 → 通知传输服务
|
||
├─ RPC请求 → 发送到设备
|
||
└─ 告警 → 保存到数据库
|
||
↓
|
||
Kafka (ToTransportMsg) → tb.transport.* 主题 (如果需要推送到设备)
|
||
```
|
||
|
||
---
|
||
|
||
## 4. 流量出口
|
||
|
||
### 4.1 HTTP响应出口
|
||
|
||
**出口点**: REST Controller返回响应
|
||
|
||
**流量去向**:
|
||
- **tb-web-ui**: Web前端接收API响应
|
||
- **外部API客户端**: 第三方系统接收响应
|
||
|
||
### 4.2 Kafka消息出口
|
||
|
||
**出口点**:
|
||
- `DefaultTbClusterService.pushMsgToRuleEngine()`: 推送到规则引擎
|
||
- `DefaultTbCoreToTransportService.process()`: 推送到传输服务
|
||
|
||
**Kafka主题**:
|
||
- `tb.rule-engine.*`: 发送到规则引擎
|
||
- `tb.transport.*`: 发送到传输服务
|
||
|
||
**流量去向**:
|
||
- **tb-rule-engine**: 规则链处理
|
||
- **传输服务**: 推送到设备(RPC请求、属性更新等)
|
||
|
||
### 4.3 数据库写入出口
|
||
|
||
**出口点**: DAO层执行数据库操作
|
||
|
||
**流量去向**:
|
||
- **PostgreSQL**: 实体数据(设备、用户、租户等)
|
||
- **Cassandra**: 时序数据(遥测数据,混合模式)
|
||
|
||
### 4.4 缓存写入出口
|
||
|
||
**出口点**: Cache服务
|
||
|
||
**流量去向**:
|
||
- **Valkey**: 设备属性缓存、会话信息缓存
|
||
|
||
---
|
||
|
||
## 5. 核心代码模块详解
|
||
|
||
### 5.1 ThingsboardServerApplication
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java`
|
||
|
||
**功能**:
|
||
- Spring Boot应用入口
|
||
- 组件扫描配置
|
||
- 应用启动完成后记录启动时间
|
||
|
||
**关键代码**:
|
||
```java
|
||
@SpringBootConfiguration
|
||
@EnableAsync
|
||
@EnableScheduling
|
||
@ComponentScan({"org.thingsboard.server", "org.thingsboard.script"})
|
||
public class ThingsboardServerApplication {
|
||
public static void main(String[] args) {
|
||
SpringApplication.run(ThingsboardServerApplication.class, updateArguments(args));
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.2 DefaultTbCoreConsumerService
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`
|
||
|
||
**功能**:
|
||
- 消费Kafka中的`ToCoreMsg`消息
|
||
- 根据消息类型分发到不同的处理服务
|
||
- 管理消息处理的生命周期
|
||
|
||
**关键方法**:
|
||
|
||
1. **processMsgs()**: 批量处理消息
|
||
```java
|
||
private void processMsgs(List<TbProtoQueueMsg<ToCoreMsg>> msgs, ...) {
|
||
// 根据消息类型分发
|
||
if (toCoreMsg.hasToDeviceActorMsg()) {
|
||
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
|
||
} else if (toCoreMsg.hasToSubscriptionMgrMsg()) {
|
||
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
|
||
}
|
||
// ...
|
||
}
|
||
```
|
||
|
||
2. **forwardToDeviceActor()**: 转发消息到DeviceActor
|
||
- 通过Actor系统将消息路由到对应的设备Actor
|
||
- 使用租户ID和设备ID进行路由
|
||
|
||
3. **forwardToStateService()**: 转发消息到状态服务
|
||
- 处理设备连接、断开、不活动等状态事件
|
||
|
||
### 5.3 Actor系统
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/actors/`
|
||
|
||
**架构**:
|
||
```
|
||
AppActor (应用级别)
|
||
└─ TenantActor (租户级别)
|
||
├─ DeviceActor (设备级别)
|
||
└─ RuleChainActor (规则链级别)
|
||
```
|
||
|
||
**AppActor** (`actors/app/AppActor.java`):
|
||
- 管理所有TenantActor
|
||
- 处理分区变更、组件生命周期等系统级消息
|
||
- 路由消息到对应的TenantActor
|
||
|
||
**TenantActor** (`actors/tenant/TenantActor.java`):
|
||
- 管理租户下的设备和规则链
|
||
- 路由消息到DeviceActor或RuleChainActor
|
||
- 处理租户级别的订阅管理
|
||
|
||
**DeviceActor** (`actors/device/DeviceActor.java`):
|
||
- 处理单个设备的所有消息
|
||
- 管理设备状态、属性、遥测数据
|
||
- 处理设备RPC请求和响应
|
||
|
||
### 5.4 DeviceActor
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`
|
||
|
||
**功能**:
|
||
- 处理设备的遥测数据、属性更新
|
||
- 管理设备会话状态
|
||
- 处理设备RPC请求
|
||
- 推送消息到规则引擎
|
||
|
||
**关键处理流程**:
|
||
|
||
1. **接收遥测数据** (`onPostTelemetry()`):
|
||
- 存储到数据库(PostgreSQL/Cassandra)
|
||
- 构建TbMsg消息
|
||
- 推送到规则引擎
|
||
|
||
2. **接收属性更新** (`onPostAttributes()`):
|
||
- 更新设备属性
|
||
- 触发属性更新订阅
|
||
- 推送到规则引擎
|
||
|
||
3. **推送到规则引擎** (`pushToRuleEngine()`):
|
||
- 构建ToRuleEngineMsg
|
||
- 发送到Kafka的tb.rule-engine.*主题
|
||
|
||
### 5.5 DefaultTbClusterService
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`
|
||
|
||
**功能**:
|
||
- 集群级别的消息路由
|
||
- 推送到规则引擎
|
||
- 推送到核心服务(跨节点)
|
||
|
||
**关键方法**:
|
||
|
||
1. **pushMsgToRuleEngine()**: 推送消息到规则引擎
|
||
```java
|
||
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
|
||
// 解析规则引擎配置
|
||
// 发送到Kafka的tb.rule-engine.*主题
|
||
ruleEngineProducerService.sendToRuleEngine(...);
|
||
}
|
||
```
|
||
|
||
### 5.6 DefaultTbCoreToTransportService
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java`
|
||
|
||
**功能**:
|
||
- 核心服务向传输服务发送消息
|
||
- 发送RPC请求、属性更新通知等到设备
|
||
|
||
**关键方法**:
|
||
|
||
1. **process()**: 发送消息到传输服务
|
||
```java
|
||
public void process(String nodeId, ToTransportMsg msg, ...) {
|
||
// 获取传输服务节点主题
|
||
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId);
|
||
// 发送到Kafka
|
||
tbTransportProducer.send(tpi, queueMsg, callback);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 6. 数据流向图
|
||
|
||
```
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 流量入口 │
|
||
├─────────────────────────────────────────────────────────────────┤
|
||
│ 1. HTTP REST API (端口8080) │
|
||
│ ├─ tb-web-ui (前端请求) │
|
||
│ ├─ HAProxy (负载均衡) │
|
||
│ └─ 外部API客户端 │
|
||
│ │
|
||
│ 2. Kafka (tb.core.* 主题) │
|
||
│ ├─ tb-mqtt-transport (设备数据) │
|
||
│ ├─ tb-http-transport (设备数据) │
|
||
│ ├─ tb-rule-engine (规则引擎结果) │
|
||
│ └─ tb-vc-executor (版本控制) │
|
||
│ │
|
||
│ 3. WebSocket (端口8080) │
|
||
│ └─ tb-web-ui (实时数据推送) │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
↓
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 流量处理 │
|
||
├─────────────────────────────────────────────────────────────────┤
|
||
│ 1. REST API 处理 │
|
||
│ Controller → Service → DAO → 数据库 │
|
||
│ │
|
||
│ 2. Kafka 消息处理 │
|
||
│ DefaultTbCoreConsumerService │
|
||
│ ├─ forwardToDeviceActor() → DeviceActor │
|
||
│ ├─ forwardToSubMgrService() → SubscriptionManager │
|
||
│ └─ forwardToStateService() → DeviceStateService │
|
||
│ │
|
||
│ 3. Actor 系统处理 │
|
||
│ AppActor → TenantActor → DeviceActor │
|
||
│ ├─ 设备状态管理 │
|
||
│ ├─ 属性更新 │
|
||
│ ├─ 遥测数据存储 │
|
||
│ └─ 推送消息到规则引擎 │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
↓
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 流量出口 │
|
||
├─────────────────────────────────────────────────────────────────┤
|
||
│ 1. HTTP 响应 │
|
||
│ → tb-web-ui / 外部API客户端 │
|
||
│ │
|
||
│ 2. Kafka 消息 │
|
||
│ → tb.rule-engine.* (规则引擎) │
|
||
│ → tb.transport.* (传输服务) │
|
||
│ │
|
||
│ 3. 数据库写入 │
|
||
│ → PostgreSQL (实体数据) │
|
||
│ → Cassandra (时序数据,混合模式) │
|
||
│ │
|
||
│ 4. 缓存写入 │
|
||
│ → Valkey (属性缓存、会话缓存) │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 关键配置
|
||
|
||
### 7.1 环境变量配置
|
||
|
||
- `TB_SERVICE_TYPE=tb-core`: 标识为核心服务
|
||
- `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址
|
||
- `TB_QUEUE_TYPE=kafka`: 队列类型
|
||
- `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址
|
||
- `SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/thingsboard`: 数据库地址
|
||
|
||
### 7.2 核心配置项
|
||
|
||
- `queue.core.poll-interval`: Kafka消息轮询间隔
|
||
- `queue.core.consumer-per-partition`: 是否每个分区一个消费者
|
||
- `actors.rule.engine.queue.max_pending_msgs`: 规则引擎队列最大待处理消息数
|
||
|
||
---
|
||
|
||
## 8. 总结
|
||
|
||
**tb-core** 是 ThingsBoard 的核心枢纽,负责:
|
||
|
||
1. **入口多样性**: HTTP REST API、Kafka消息、WebSocket
|
||
2. **消息路由**: 通过Actor系统将消息路由到正确的处理者
|
||
3. **业务处理**: 设备管理、用户管理、数据存储
|
||
4. **消息推送**: 推送到规则引擎进行规则链处理
|
||
5. **数据持久化**: 与PostgreSQL和Cassandra交互
|
||
6. **缓存管理**: 使用Valkey缓存设备属性
|
||
|
||
整个流程采用了**Actor模型**实现消息驱动的异步处理,通过**Kafka**实现服务间的解耦通信,保证了系统的高可用性和可扩展性。
|
||
|
||
|
||
|
||
|