# tb-core 流量分析 ## 1. 镜像概述 **tb-core** 是 ThingsBoard 的核心服务,负责: - 设备管理(创建设备、更新设备信息等) - 用户和租户管理 - REST API 提供 - 设备会话管理 - 消息路由到规则引擎 - 与数据库交互(PostgreSQL/Cassandra) - 缓存管理(Valkey) **镜像名称**: `thingsboard/tb-node` **服务类型**: 通过环境变量 `TB_SERVICE_TYPE=tb-core` 区分 **源码位置**: `application/` 目录 --- ## 2. 流量入口 ### 2.1 HTTP REST API 入口 **入口点**: - `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java` - Spring Boot 主类 - REST Controller: `application/src/main/java/org/thingsboard/server/controller/` **端口**: `8080` **流量来源**: - **tb-web-ui**: Web前端通过HTTP REST API调用 - **外部API客户端**: 第三方系统通过REST API集成 - **HAProxy**: 负载均衡器转发HTTP请求 **关键接口**: - `/api/device` - 设备管理API - `/api/tenant` - 租户管理API - `/api/user` - 用户管理API - `/api/telemetry` - 遥测数据API - `/api/rpc` - RPC调用API ### 2.2 Kafka 消息入口 **入口点**: - `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java` **Kafka主题**: `tb.core.*` **流量来源**: - **传输服务** (tb-mqtt-transport, tb-http-transport等): 发送设备数据到核心服务 - **规则引擎** (tb-rule-engine): 返回规则链处理结果 - **tb-vc-executor**: 版本控制相关消息 **消息类型**: - `ToCoreMsg`: 传输服务发送的消息 - `ToCoreNotificationMsg`: 通知消息 - `TransportToDeviceActorMsg`: 设备Actor消息 ### 2.3 WebSocket 入口 **入口点**: - `application/src/main/java/org/thingsboard/server/controller/WebSocketController.java` **端口**: `8080` (与HTTP同一端口) **流量来源**: - **tb-web-ui**: Web前端通过WebSocket接收实时数据推送 --- ## 3. 流量处理流程 ### 3.1 REST API 请求处理流程 ``` 用户请求 (HTTP) ↓ HAProxy (可选) ↓ Spring MVC DispatcherServlet ↓ @RestController (例如: DeviceController) ↓ Service层 (例如: DeviceService) ↓ DAO层 (例如: DeviceDao) ↓ PostgreSQL/Cassandra (数据库操作) ↓ 返回响应 (HTTP Response) ``` **核心代码模块**: 1. **ThingsboardServerApplication** (`application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java`) - Spring Boot主类 - 启动整个应用 - 组件扫描: `org.thingsboard.server` 2. **Controller层** (`application/src/main/java/org/thingsboard/server/controller/`) - `DeviceController.java`: 设备管理API - `TenantController.java`: 租户管理API - `UserController.java`: 用户管理API - `WebSocketController.java`: WebSocket连接管理 3. **Service层** (`application/src/main/java/org/thingsboard/server/service/`) - `DeviceService`: 设备业务逻辑 - `TenantService`: 租户业务逻辑 - `UserService`: 用户业务逻辑 4. **DAO层** (`dao/src/main/java/org/thingsboard/server/dao/`) - `DeviceDao`: 设备数据访问 - `TenantDao`: 租户数据访问 - `UserDao`: 用户数据访问 ### 3.2 Kafka 消息处理流程 ``` Kafka (tb.core.* 主题) ↓ DefaultTbCoreConsumerService.processMsgs() ↓ 消息类型判断 ├─ ToDeviceActorMsg → DeviceActor (Actor系统) ├─ ToSubscriptionMgrMsg → SubscriptionManagerService ├─ DeviceStateServiceMsg → DeviceStateService └─ 其他消息类型... ↓ Actor系统处理 ├─ TenantActor (租户级别Actor) │ └─ DeviceActor (设备级别Actor) └─ RuleChainActor (规则链Actor) ↓ 业务处理 ├─ 设备状态更新 ├─ 属性更新 ├─ 遥测数据存储 └─ 推送到规则引擎 ↓ Kafka (tb.rule-engine.* 主题) → 规则引擎 ``` **核心代码模块**: 1. **DefaultTbCoreConsumerService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java`) - **功能**: 消费Kafka消息并分发到不同的处理服务 - **关键方法**: - `processMsgs()`: 批量处理消息 - `forwardToDeviceActor()`: 转发消息到DeviceActor - `forwardToSubMgrService()`: 转发消息到订阅管理器 - `forwardToStateService()`: 转发消息到状态服务 2. **Actor系统** (`application/src/main/java/org/thingsboard/server/actors/`) - **AppActor**: 应用级别Actor,管理所有TenantActor - **TenantActor**: 租户级别Actor,管理租户下的设备和规则链 - **DeviceActor**: 设备级别Actor,处理单个设备的消息 - **RuleChainActor**: 规则链Actor,管理规则链执行 3. **DeviceActor** (`application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java`) - **功能**: 处理设备相关的所有消息 - **处理消息类型**: - `TransportToDeviceActorMsg`: 传输层消息(遥测、属性、RPC响应) - `DeviceAttributesUpdateMsg`: 设备属性更新 - `DeviceRpcRequestMsg`: 设备RPC请求 4. **TenantActor** (`application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java`) - **功能**: 租户级别消息路由 - **关键方法**: - `onToDeviceActorMsg()`: 路由消息到DeviceActor - `onQueueToRuleEngineMsg()`: 推送消息到规则引擎 ### 3.3 设备数据上报流程(从传输服务) ``` 设备 → tb-mqtt-transport (MQTT消息) ↓ MqttTransportHandler.processDevicePublish() ↓ DefaultTransportService.process() (PostTelemetryMsg/PostAttributeMsg) ↓ Kafka (ToCoreMsg) → tb.core.* 主题 ↓ tb-core: DefaultTbCoreConsumerService.processMsgs() ↓ forwardToDeviceActor() → DeviceActor ↓ DeviceActor.process() → 处理遥测/属性数据 ↓ 推送消息到规则引擎: pushMsgToRuleEngine() ↓ Kafka (ToRuleEngineMsg) → tb.rule-engine.* 主题 ↓ tb-rule-engine 处理 ``` **核心代码模块**: 1. **DefaultTransportService** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`) - **功能**: 传输服务统一接口,处理设备上报数据 - **关键方法**: - `process(PostTelemetryMsg)`: 处理遥测数据 - `process(PostAttributeMsg)`: 处理属性数据 - `sendToCore()`: 发送消息到核心服务(通过Kafka) 2. **DefaultTbClusterService** (`application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java`) - **功能**: 集群服务,负责消息路由和推送到规则引擎 - **关键方法**: - `pushMsgToRuleEngine()`: 推送消息到规则引擎队列 ### 3.4 规则引擎消息返回流程 ``` tb-rule-engine 处理完成 ↓ Kafka (ToCoreMsg) → tb.core.* 主题 ↓ tb-core: DefaultTbCoreConsumerService.processMsgs() ↓ forwardToDeviceActor() → DeviceActor ↓ DeviceActor: 处理规则引擎返回的消息 ├─ 属性更新 → 通知传输服务 ├─ RPC请求 → 发送到设备 └─ 告警 → 保存到数据库 ↓ Kafka (ToTransportMsg) → tb.transport.* 主题 (如果需要推送到设备) ``` --- ## 4. 流量出口 ### 4.1 HTTP响应出口 **出口点**: REST Controller返回响应 **流量去向**: - **tb-web-ui**: Web前端接收API响应 - **外部API客户端**: 第三方系统接收响应 ### 4.2 Kafka消息出口 **出口点**: - `DefaultTbClusterService.pushMsgToRuleEngine()`: 推送到规则引擎 - `DefaultTbCoreToTransportService.process()`: 推送到传输服务 **Kafka主题**: - `tb.rule-engine.*`: 发送到规则引擎 - `tb.transport.*`: 发送到传输服务 **流量去向**: - **tb-rule-engine**: 规则链处理 - **传输服务**: 推送到设备(RPC请求、属性更新等) ### 4.3 数据库写入出口 **出口点**: DAO层执行数据库操作 **流量去向**: - **PostgreSQL**: 实体数据(设备、用户、租户等) - **Cassandra**: 时序数据(遥测数据,混合模式) ### 4.4 缓存写入出口 **出口点**: Cache服务 **流量去向**: - **Valkey**: 设备属性缓存、会话信息缓存 --- ## 5. 核心代码模块详解 ### 5.1 ThingsboardServerApplication **位置**: `application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java` **功能**: - Spring Boot应用入口 - 组件扫描配置 - 应用启动完成后记录启动时间 **关键代码**: ```java @SpringBootConfiguration @EnableAsync @EnableScheduling @ComponentScan({"org.thingsboard.server", "org.thingsboard.script"}) public class ThingsboardServerApplication { public static void main(String[] args) { SpringApplication.run(ThingsboardServerApplication.class, updateArguments(args)); } } ``` ### 5.2 DefaultTbCoreConsumerService **位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java` **功能**: - 消费Kafka中的`ToCoreMsg`消息 - 根据消息类型分发到不同的处理服务 - 管理消息处理的生命周期 **关键方法**: 1. **processMsgs()**: 批量处理消息 ```java private void processMsgs(List> msgs, ...) { // 根据消息类型分发 if (toCoreMsg.hasToDeviceActorMsg()) { forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); } else if (toCoreMsg.hasToSubscriptionMgrMsg()) { forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback); } // ... } ``` 2. **forwardToDeviceActor()**: 转发消息到DeviceActor - 通过Actor系统将消息路由到对应的设备Actor - 使用租户ID和设备ID进行路由 3. **forwardToStateService()**: 转发消息到状态服务 - 处理设备连接、断开、不活动等状态事件 ### 5.3 Actor系统 **位置**: `application/src/main/java/org/thingsboard/server/actors/` **架构**: ``` AppActor (应用级别) └─ TenantActor (租户级别) ├─ DeviceActor (设备级别) └─ RuleChainActor (规则链级别) ``` **AppActor** (`actors/app/AppActor.java`): - 管理所有TenantActor - 处理分区变更、组件生命周期等系统级消息 - 路由消息到对应的TenantActor **TenantActor** (`actors/tenant/TenantActor.java`): - 管理租户下的设备和规则链 - 路由消息到DeviceActor或RuleChainActor - 处理租户级别的订阅管理 **DeviceActor** (`actors/device/DeviceActor.java`): - 处理单个设备的所有消息 - 管理设备状态、属性、遥测数据 - 处理设备RPC请求和响应 ### 5.4 DeviceActor **位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java` **功能**: - 处理设备的遥测数据、属性更新 - 管理设备会话状态 - 处理设备RPC请求 - 推送消息到规则引擎 **关键处理流程**: 1. **接收遥测数据** (`onPostTelemetry()`): - 存储到数据库(PostgreSQL/Cassandra) - 构建TbMsg消息 - 推送到规则引擎 2. **接收属性更新** (`onPostAttributes()`): - 更新设备属性 - 触发属性更新订阅 - 推送到规则引擎 3. **推送到规则引擎** (`pushToRuleEngine()`): - 构建ToRuleEngineMsg - 发送到Kafka的tb.rule-engine.*主题 ### 5.5 DefaultTbClusterService **位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java` **功能**: - 集群级别的消息路由 - 推送到规则引擎 - 推送到核心服务(跨节点) **关键方法**: 1. **pushMsgToRuleEngine()**: 推送消息到规则引擎 ```java public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { // 解析规则引擎配置 // 发送到Kafka的tb.rule-engine.*主题 ruleEngineProducerService.sendToRuleEngine(...); } ``` ### 5.6 DefaultTbCoreToTransportService **位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java` **功能**: - 核心服务向传输服务发送消息 - 发送RPC请求、属性更新通知等到设备 **关键方法**: 1. **process()**: 发送消息到传输服务 ```java public void process(String nodeId, ToTransportMsg msg, ...) { // 获取传输服务节点主题 TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId); // 发送到Kafka tbTransportProducer.send(tpi, queueMsg, callback); } ``` --- ## 6. 数据流向图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 流量入口 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. HTTP REST API (端口8080) │ │ ├─ tb-web-ui (前端请求) │ │ ├─ HAProxy (负载均衡) │ │ └─ 外部API客户端 │ │ │ │ 2. Kafka (tb.core.* 主题) │ │ ├─ tb-mqtt-transport (设备数据) │ │ ├─ tb-http-transport (设备数据) │ │ ├─ tb-rule-engine (规则引擎结果) │ │ └─ tb-vc-executor (版本控制) │ │ │ │ 3. WebSocket (端口8080) │ │ └─ tb-web-ui (实时数据推送) │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 流量处理 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. REST API 处理 │ │ Controller → Service → DAO → 数据库 │ │ │ │ 2. Kafka 消息处理 │ │ DefaultTbCoreConsumerService │ │ ├─ forwardToDeviceActor() → DeviceActor │ │ ├─ forwardToSubMgrService() → SubscriptionManager │ │ └─ forwardToStateService() → DeviceStateService │ │ │ │ 3. Actor 系统处理 │ │ AppActor → TenantActor → DeviceActor │ │ ├─ 设备状态管理 │ │ ├─ 属性更新 │ │ ├─ 遥测数据存储 │ │ └─ 推送消息到规则引擎 │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 流量出口 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. HTTP 响应 │ │ → tb-web-ui / 外部API客户端 │ │ │ │ 2. Kafka 消息 │ │ → tb.rule-engine.* (规则引擎) │ │ → tb.transport.* (传输服务) │ │ │ │ 3. 数据库写入 │ │ → PostgreSQL (实体数据) │ │ → Cassandra (时序数据,混合模式) │ │ │ │ 4. 缓存写入 │ │ → Valkey (属性缓存、会话缓存) │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 7. 关键配置 ### 7.1 环境变量配置 - `TB_SERVICE_TYPE=tb-core`: 标识为核心服务 - `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址 - `TB_QUEUE_TYPE=kafka`: 队列类型 - `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址 - `SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/thingsboard`: 数据库地址 ### 7.2 核心配置项 - `queue.core.poll-interval`: Kafka消息轮询间隔 - `queue.core.consumer-per-partition`: 是否每个分区一个消费者 - `actors.rule.engine.queue.max_pending_msgs`: 规则引擎队列最大待处理消息数 --- ## 8. 总结 **tb-core** 是 ThingsBoard 的核心枢纽,负责: 1. **入口多样性**: HTTP REST API、Kafka消息、WebSocket 2. **消息路由**: 通过Actor系统将消息路由到正确的处理者 3. **业务处理**: 设备管理、用户管理、数据存储 4. **消息推送**: 推送到规则引擎进行规则链处理 5. **数据持久化**: 与PostgreSQL和Cassandra交互 6. **缓存管理**: 使用Valkey缓存设备属性 整个流程采用了**Actor模型**实现消息驱动的异步处理,通过**Kafka**实现服务间的解耦通信,保证了系统的高可用性和可扩展性。