19 KiB
tb-core 流量分析
1. 镜像概述
tb-core 是 ThingsBoard 的核心服务,负责:
- 设备管理(创建设备、更新设备信息等)
- 用户和租户管理
- REST API 提供
- 设备会话管理
- 消息路由到规则引擎
- 与数据库交互(PostgreSQL/Cassandra)
- 缓存管理(Valkey)
镜像名称: thingsboard/tb-node
服务类型: 通过环境变量 TB_SERVICE_TYPE=tb-core 区分
源码位置: application/ 目录
2. 流量入口
2.1 HTTP REST API 入口
入口点:
application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java- Spring Boot 主类- REST Controller:
application/src/main/java/org/thingsboard/server/controller/
端口: 8080
流量来源:
- tb-web-ui: Web前端通过HTTP REST API调用
- 外部API客户端: 第三方系统通过REST API集成
- HAProxy: 负载均衡器转发HTTP请求
关键接口:
/api/device- 设备管理API/api/tenant- 租户管理API/api/user- 用户管理API/api/telemetry- 遥测数据API/api/rpc- RPC调用API
2.2 Kafka 消息入口
入口点:
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
Kafka主题: tb.core.*
流量来源:
- 传输服务 (tb-mqtt-transport, tb-http-transport等): 发送设备数据到核心服务
- 规则引擎 (tb-rule-engine): 返回规则链处理结果
- tb-vc-executor: 版本控制相关消息
消息类型:
ToCoreMsg: 传输服务发送的消息ToCoreNotificationMsg: 通知消息TransportToDeviceActorMsg: 设备Actor消息
2.3 WebSocket 入口
入口点:
application/src/main/java/org/thingsboard/server/controller/WebSocketController.java
端口: 8080 (与HTTP同一端口)
流量来源:
- tb-web-ui: Web前端通过WebSocket接收实时数据推送
3. 流量处理流程
3.1 REST API 请求处理流程
用户请求 (HTTP)
↓
HAProxy (可选)
↓
Spring MVC DispatcherServlet
↓
@RestController (例如: DeviceController)
↓
Service层 (例如: DeviceService)
↓
DAO层 (例如: DeviceDao)
↓
PostgreSQL/Cassandra (数据库操作)
↓
返回响应 (HTTP Response)
核心代码模块:
-
ThingsboardServerApplication (
application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java)- Spring Boot主类
- 启动整个应用
- 组件扫描:
org.thingsboard.server
-
Controller层 (
application/src/main/java/org/thingsboard/server/controller/)DeviceController.java: 设备管理APITenantController.java: 租户管理APIUserController.java: 用户管理APIWebSocketController.java: WebSocket连接管理
-
Service层 (
application/src/main/java/org/thingsboard/server/service/)DeviceService: 设备业务逻辑TenantService: 租户业务逻辑UserService: 用户业务逻辑
-
DAO层 (
dao/src/main/java/org/thingsboard/server/dao/)DeviceDao: 设备数据访问TenantDao: 租户数据访问UserDao: 用户数据访问
3.2 Kafka 消息处理流程
Kafka (tb.core.* 主题)
↓
DefaultTbCoreConsumerService.processMsgs()
↓
消息类型判断
├─ ToDeviceActorMsg → DeviceActor (Actor系统)
├─ ToSubscriptionMgrMsg → SubscriptionManagerService
├─ DeviceStateServiceMsg → DeviceStateService
└─ 其他消息类型...
↓
Actor系统处理
├─ TenantActor (租户级别Actor)
│ └─ DeviceActor (设备级别Actor)
└─ RuleChainActor (规则链Actor)
↓
业务处理
├─ 设备状态更新
├─ 属性更新
├─ 遥测数据存储
└─ 推送到规则引擎
↓
Kafka (tb.rule-engine.* 主题) → 规则引擎
核心代码模块:
-
DefaultTbCoreConsumerService (
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java)- 功能: 消费Kafka消息并分发到不同的处理服务
- 关键方法:
processMsgs(): 批量处理消息forwardToDeviceActor(): 转发消息到DeviceActorforwardToSubMgrService(): 转发消息到订阅管理器forwardToStateService(): 转发消息到状态服务
-
Actor系统 (
application/src/main/java/org/thingsboard/server/actors/)- AppActor: 应用级别Actor,管理所有TenantActor
- TenantActor: 租户级别Actor,管理租户下的设备和规则链
- DeviceActor: 设备级别Actor,处理单个设备的消息
- RuleChainActor: 规则链Actor,管理规则链执行
-
DeviceActor (
application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java)- 功能: 处理设备相关的所有消息
- 处理消息类型:
TransportToDeviceActorMsg: 传输层消息(遥测、属性、RPC响应)DeviceAttributesUpdateMsg: 设备属性更新DeviceRpcRequestMsg: 设备RPC请求
-
TenantActor (
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java)- 功能: 租户级别消息路由
- 关键方法:
onToDeviceActorMsg(): 路由消息到DeviceActoronQueueToRuleEngineMsg(): 推送消息到规则引擎
3.3 设备数据上报流程(从传输服务)
设备 → tb-mqtt-transport (MQTT消息)
↓
MqttTransportHandler.processDevicePublish()
↓
DefaultTransportService.process() (PostTelemetryMsg/PostAttributeMsg)
↓
Kafka (ToCoreMsg) → tb.core.* 主题
↓
tb-core: DefaultTbCoreConsumerService.processMsgs()
↓
forwardToDeviceActor() → DeviceActor
↓
DeviceActor.process() → 处理遥测/属性数据
↓
推送消息到规则引擎: pushMsgToRuleEngine()
↓
Kafka (ToRuleEngineMsg) → tb.rule-engine.* 主题
↓
tb-rule-engine 处理
核心代码模块:
-
DefaultTransportService (
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java)- 功能: 传输服务统一接口,处理设备上报数据
- 关键方法:
process(PostTelemetryMsg): 处理遥测数据process(PostAttributeMsg): 处理属性数据sendToCore(): 发送消息到核心服务(通过Kafka)
-
DefaultTbClusterService (
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java)- 功能: 集群服务,负责消息路由和推送到规则引擎
- 关键方法:
pushMsgToRuleEngine(): 推送消息到规则引擎队列
3.4 规则引擎消息返回流程
tb-rule-engine 处理完成
↓
Kafka (ToCoreMsg) → tb.core.* 主题
↓
tb-core: DefaultTbCoreConsumerService.processMsgs()
↓
forwardToDeviceActor() → DeviceActor
↓
DeviceActor: 处理规则引擎返回的消息
├─ 属性更新 → 通知传输服务
├─ RPC请求 → 发送到设备
└─ 告警 → 保存到数据库
↓
Kafka (ToTransportMsg) → tb.transport.* 主题 (如果需要推送到设备)
4. 流量出口
4.1 HTTP响应出口
出口点: REST Controller返回响应
流量去向:
- tb-web-ui: Web前端接收API响应
- 外部API客户端: 第三方系统接收响应
4.2 Kafka消息出口
出口点:
DefaultTbClusterService.pushMsgToRuleEngine(): 推送到规则引擎DefaultTbCoreToTransportService.process(): 推送到传输服务
Kafka主题:
tb.rule-engine.*: 发送到规则引擎tb.transport.*: 发送到传输服务
流量去向:
- tb-rule-engine: 规则链处理
- 传输服务: 推送到设备(RPC请求、属性更新等)
4.3 数据库写入出口
出口点: DAO层执行数据库操作
流量去向:
- PostgreSQL: 实体数据(设备、用户、租户等)
- Cassandra: 时序数据(遥测数据,混合模式)
4.4 缓存写入出口
出口点: Cache服务
流量去向:
- Valkey: 设备属性缓存、会话信息缓存
5. 核心代码模块详解
5.1 ThingsboardServerApplication
位置: application/src/main/java/org/thingsboard/server/ThingsboardServerApplication.java
功能:
- Spring Boot应用入口
- 组件扫描配置
- 应用启动完成后记录启动时间
关键代码:
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server", "org.thingsboard.script"})
public class ThingsboardServerApplication {
public static void main(String[] args) {
SpringApplication.run(ThingsboardServerApplication.class, updateArguments(args));
}
}
5.2 DefaultTbCoreConsumerService
位置: application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
功能:
- 消费Kafka中的
ToCoreMsg消息 - 根据消息类型分发到不同的处理服务
- 管理消息处理的生命周期
关键方法:
-
processMsgs(): 批量处理消息
private void processMsgs(List<TbProtoQueueMsg<ToCoreMsg>> msgs, ...) { // 根据消息类型分发 if (toCoreMsg.hasToDeviceActorMsg()) { forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback); } else if (toCoreMsg.hasToSubscriptionMgrMsg()) { forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback); } // ... } -
forwardToDeviceActor(): 转发消息到DeviceActor
- 通过Actor系统将消息路由到对应的设备Actor
- 使用租户ID和设备ID进行路由
-
forwardToStateService(): 转发消息到状态服务
- 处理设备连接、断开、不活动等状态事件
5.3 Actor系统
位置: application/src/main/java/org/thingsboard/server/actors/
架构:
AppActor (应用级别)
└─ TenantActor (租户级别)
├─ DeviceActor (设备级别)
└─ RuleChainActor (规则链级别)
AppActor (actors/app/AppActor.java):
- 管理所有TenantActor
- 处理分区变更、组件生命周期等系统级消息
- 路由消息到对应的TenantActor
TenantActor (actors/tenant/TenantActor.java):
- 管理租户下的设备和规则链
- 路由消息到DeviceActor或RuleChainActor
- 处理租户级别的订阅管理
DeviceActor (actors/device/DeviceActor.java):
- 处理单个设备的所有消息
- 管理设备状态、属性、遥测数据
- 处理设备RPC请求和响应
5.4 DeviceActor
位置: application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java
功能:
- 处理设备的遥测数据、属性更新
- 管理设备会话状态
- 处理设备RPC请求
- 推送消息到规则引擎
关键处理流程:
-
接收遥测数据 (
onPostTelemetry()):- 存储到数据库(PostgreSQL/Cassandra)
- 构建TbMsg消息
- 推送到规则引擎
-
接收属性更新 (
onPostAttributes()):- 更新设备属性
- 触发属性更新订阅
- 推送到规则引擎
-
推送到规则引擎 (
pushToRuleEngine()):- 构建ToRuleEngineMsg
- 发送到Kafka的tb.rule-engine.*主题
5.5 DefaultTbClusterService
位置: application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
功能:
- 集群级别的消息路由
- 推送到规则引擎
- 推送到核心服务(跨节点)
关键方法:
- pushMsgToRuleEngine(): 推送消息到规则引擎
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { // 解析规则引擎配置 // 发送到Kafka的tb.rule-engine.*主题 ruleEngineProducerService.sendToRuleEngine(...); }
5.6 DefaultTbCoreToTransportService
位置: application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java
功能:
- 核心服务向传输服务发送消息
- 发送RPC请求、属性更新通知等到设备
关键方法:
- process(): 发送消息到传输服务
public void process(String nodeId, ToTransportMsg msg, ...) { // 获取传输服务节点主题 TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId); // 发送到Kafka tbTransportProducer.send(tpi, queueMsg, callback); }
6. 数据流向图
┌─────────────────────────────────────────────────────────────────┐
│ 流量入口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP REST API (端口8080) │
│ ├─ tb-web-ui (前端请求) │
│ ├─ HAProxy (负载均衡) │
│ └─ 外部API客户端 │
│ │
│ 2. Kafka (tb.core.* 主题) │
│ ├─ tb-mqtt-transport (设备数据) │
│ ├─ tb-http-transport (设备数据) │
│ ├─ tb-rule-engine (规则引擎结果) │
│ └─ tb-vc-executor (版本控制) │
│ │
│ 3. WebSocket (端口8080) │
│ └─ tb-web-ui (实时数据推送) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量处理 │
├─────────────────────────────────────────────────────────────────┤
│ 1. REST API 处理 │
│ Controller → Service → DAO → 数据库 │
│ │
│ 2. Kafka 消息处理 │
│ DefaultTbCoreConsumerService │
│ ├─ forwardToDeviceActor() → DeviceActor │
│ ├─ forwardToSubMgrService() → SubscriptionManager │
│ └─ forwardToStateService() → DeviceStateService │
│ │
│ 3. Actor 系统处理 │
│ AppActor → TenantActor → DeviceActor │
│ ├─ 设备状态管理 │
│ ├─ 属性更新 │
│ ├─ 遥测数据存储 │
│ └─ 推送消息到规则引擎 │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 流量出口 │
├─────────────────────────────────────────────────────────────────┤
│ 1. HTTP 响应 │
│ → tb-web-ui / 外部API客户端 │
│ │
│ 2. Kafka 消息 │
│ → tb.rule-engine.* (规则引擎) │
│ → tb.transport.* (传输服务) │
│ │
│ 3. 数据库写入 │
│ → PostgreSQL (实体数据) │
│ → Cassandra (时序数据,混合模式) │
│ │
│ 4. 缓存写入 │
│ → Valkey (属性缓存、会话缓存) │
└─────────────────────────────────────────────────────────────────┘
7. 关键配置
7.1 环境变量配置
TB_SERVICE_TYPE=tb-core: 标识为核心服务ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址TB_QUEUE_TYPE=kafka: 队列类型TB_KAFKA_SERVERS=kafka:9092: Kafka地址SPRING_DATASOURCE_URL=jdbc:postgresql://postgres:5432/thingsboard: 数据库地址
7.2 核心配置项
queue.core.poll-interval: Kafka消息轮询间隔queue.core.consumer-per-partition: 是否每个分区一个消费者actors.rule.engine.queue.max_pending_msgs: 规则引擎队列最大待处理消息数
8. 总结
tb-core 是 ThingsBoard 的核心枢纽,负责:
- 入口多样性: HTTP REST API、Kafka消息、WebSocket
- 消息路由: 通过Actor系统将消息路由到正确的处理者
- 业务处理: 设备管理、用户管理、数据存储
- 消息推送: 推送到规则引擎进行规则链处理
- 数据持久化: 与PostgreSQL和Cassandra交互
- 缓存管理: 使用Valkey缓存设备属性
整个流程采用了Actor模型实现消息驱动的异步处理,通过Kafka实现服务间的解耦通信,保证了系统的高可用性和可扩展性。