# ThingsBoard 模块交互与通信机制分析 ## 1. 概述 ThingsBoard 采用基于消息队列的异步通信机制,实现模块间的解耦和高可用性。核心通信协议使用 Protobuf 进行消息序列化,通过 Kafka、RabbitMQ 等消息队列系统进行消息传递。 ## 2. 通信架构 ### 2.1 消息队列架构 ThingsBoard 使用消息队列作为模块间通信的中间件,支持以下队列系统: - **Kafka**: 高吞吐量分布式消息队列(推荐生产环境) - **RabbitMQ**: 企业级消息队列 - **AWS SQS**: Amazon 云消息队列 - **Google Pub/Sub**: Google 云消息队列 - **Azure Service Bus**: Azure 云消息队列 - **In-Memory**: 内存队列(仅用于测试) ### 2.2 消息类型 根据消息的目标服务,ThingsBoard 定义了以下主要消息类型: 1. **ToCoreMsg**: 发送到核心服务的消息 2. **ToCoreNotificationMsg**: 发送到核心服务的高优先级通知消息 3. **ToRuleEngineMsg**: 发送到规则引擎的消息 4. **ToRuleEngineNotificationMsg**: 发送到规则引擎的通知消息 5. **ToTransportMsg**: 发送到传输层的消息 6. **ToTransportNotificationMsg**: 发送到传输层的通知消息 ### 2.3 服务类型 ThingsBoard 定义了以下服务类型(ServiceType): - **TB_CORE**: 核心服务 - **TB_RULE_ENGINE**: 规则引擎服务 - **TB_TRANSPORT**: 传输服务 - **TB_EDQS**: 事件驱动查询服务 ## 3. 数据流分析 ### 3.1 设备遥测数据流 设备遥测数据的完整流程如下: ``` 设备 -> 传输层 -> 消息队列 -> 核心服务 -> 规则引擎 -> 数据存储 -> WebSocket -> 前端UI ``` #### 3.1.1 传输层处理 **位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java` **关键方法**: `process(PostTelemetryMsg)` ```java // 处理设备遥测数据 public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) { // 1. 验证数据点限制 if (checkLimits(sessionInfo, msg, callback, dataPoints)) { // 2. 记录设备活动 recordActivityInternal(sessionInfo); // 3. 提取租户ID和设备ID TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(...); // 4. 构建消息并发送到规则引擎 for (TsKvListProto tsKv : msg.getTsKvListList()) { TbMsg tbMsg = TbMsg.newMsg() .queueName(queueName) .type(TbMsgType.POST_TELEMETRY_REQUEST) .originator(deviceId) .data(json) .ruleChainId(ruleChainId) .build(); // 5. 发送到规则引擎队列 ruleEngineProducerService.sendToRuleEngine(...); } } } ``` **消息发送**: 传输层通过 `sendToRuleEngine()` 方法将消息发送到规则引擎队列。 #### 3.1.2 核心服务消费 **位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java` 核心服务消费来自传输层的消息: ```java // 处理来自传输层的消息 private void processToCoreMsg(ToCoreMsg toCoreMsg, TbCallback callback) { if (toCoreMsg.hasToDeviceActorMsg()) { // 转发到设备 Actor TransportToDeviceActorMsg msg = toCoreMsg.getToDeviceActorMsg(); actorContext.tell(msg); } // ... 其他消息类型处理 } ``` #### 3.1.3 Actor 系统处理 **位置**: `application/src/main/java/org/thingsboard/server/actors/` Actor 层次结构: 1. **AppActor**: 系统级 Actor - 管理所有租户 Actor - 处理系统级消息 2. **TenantActor**: 租户级 Actor - 管理租户下的所有设备 Actor - 处理租户级消息 3. **DeviceActor**: 设备级 Actor - 处理单个设备的消息 - 管理设备状态和会话 **消息路由**: ```java // AppActor 路由到 TenantActor private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse( tenantActor -> tenantActor.tell(msg), () -> msg.getCallback().onSuccess() ); } // TenantActor 路由到 DeviceActor private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) { TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId()); deviceActor.tell(msg); } ``` #### 3.1.4 规则引擎处理 **位置**: `application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java` ```java // 处理发送到规则引擎的消息 private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { TbMsg tbMsg = msg.getMsg(); if (tbMsg.getRuleChainId() == null) { // 使用根规则链 getRootChainActor().tell(msg); } else { // 使用指定的规则链 ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); } } ``` 规则引擎处理消息后,可以通过规则节点执行各种操作: - 保存遥测数据到数据库 - 触发告警 - 发送通知 - 调用外部服务 ### 3.2 设备到核心服务的通信 #### 3.2.1 消息类型 传输层向核心服务发送的消息类型(ToCoreMsg): - `TransportToDeviceActorMsg`: 设备 Actor 消息 - `DeviceStateServiceMsgProto`: 设备状态服务消息 - `DeviceConnectProto`: 设备连接消息 - `DeviceDisconnectProto`: 设备断开消息 - `DeviceInactivityProto`: 设备不活动消息 #### 3.2.2 发送流程 **位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java` ```java // 发送消息到核心服务 private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, ...) { // 1. 解析目标分区 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); // 2. 发送消息 tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback); } ``` ### 3.3 核心服务到传输层的通信 #### 3.3.1 消息类型 核心服务向传输层发送的消息类型(ToTransportMsg): - `SessionCloseNotificationProto`: 会话关闭通知 - `GetAttributeResponseMsg`: 获取属性响应 - `AttributeUpdateNotificationMsg`: 属性更新通知 - `ToDeviceRpcRequestMsg`: 设备 RPC 请求 - `ToServerRpcResponseMsg`: 服务器 RPC 响应 - `EntityUpdateMsg`: 实体更新消息 - `EntityDeleteMsg`: 实体删除消息 #### 3.3.2 发送流程 **位置**: `application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java` ```java // 核心服务发送消息到传输层 public void process(String nodeId, ToTransportMsg msg, Runnable onSuccess, Consumer onFailure) { // 1. 获取传输服务节点主题 TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, nodeId); // 2. 发送消息 TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(NULL_UUID, msg); tbTransportProducer.send(tpi, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); } ``` ### 3.4 规则引擎消息流 #### 3.4.1 消息发送到规则引擎 **位置**: `application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java` ```java // 推送消息到规则引擎 public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { // 1. 解析目标分区 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); // 2. 构建 ToRuleEngineMsg ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsgProto(TbMsgProto.newBuilder()...) .build(); // 3. 发送消息 producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(...), callback); } ``` #### 3.4.2 规则引擎消费消息 规则引擎服务消费来自队列的消息,并通过 Actor 系统路由到相应的规则链进行处理。 ## 4. 服务发现与负载均衡 ### 4.1 服务发现 ThingsBoard 使用 ZooKeeper 进行服务发现: - **服务注册**: 每个服务启动时在 ZooKeeper 注册 - **服务发现**: 通过 ZooKeeper 发现其他服务实例 - **健康检查**: 定期检查服务健康状态 ### 4.2 分区服务 **位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java` 分区服务负责: 1. **分区解析**: 根据租户ID和实体ID解析目标分区 2. **负载均衡**: 在多个服务实例间分配负载 3. **分区管理**: 管理分区变更事件 ```java // 解析目标分区 TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId) { // 根据租户ID和实体ID计算分区 // 返回对应的 TopicPartitionInfo } ``` ### 4.3 主题服务 **位置**: `common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java` 主题服务负责: 1. **主题管理**: 管理消息队列主题 2. **通知主题**: 为每个服务实例创建通知主题 ```java // 获取服务通知主题 TopicPartitionInfo getNotificationsTopic(ServiceType serviceType, String serviceId) { // 返回服务实例的通知主题 } ``` ## 5. 消息序列化 ### 5.1 Protobuf 协议 ThingsBoard 使用 Protobuf 进行消息序列化,定义文件位于: - `common/proto/src/main/proto/queue.proto`: 队列消息定义 - `common/proto/src/main/proto/transport.proto`: 传输协议定义 ### 5.2 消息包装 所有队列消息都包装在 `TbProtoQueueMsg` 中: ```java public class TbProtoQueueMsg implements TbQueueMsg { private final UUID key; // 路由键 private final T value; // Protobuf 消息 } ``` ## 6. 异步回调机制 ### 6.1 回调接口 **位置**: `common/queue/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java` ```java public interface TbQueueCallback { void onSuccess(TbQueueMsgMetadata metadata); void onFailure(Throwable t); } ``` ### 6.2 回调使用 消息发送时可以提供回调: ```java // 发送消息并注册回调 producer.send(tpi, msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { // 处理成功 } @Override public void onFailure(Throwable t) { // 处理失败 } }); ``` ## 7. 错误处理与重试 ### 7.1 消息确认 - 消息成功处理后发送确认 - 消息处理失败时根据配置进行重试 ### 7.2 死信队列 处理失败的消息可以发送到死信队列进行后续处理。 ## 8. 性能优化 ### 8.1 批量处理 消息可以批量处理以提高吞吐量: ```java // 批量处理消息 void process(List> msgs, TbCallback callback) { // 批量处理逻辑 } ``` ### 8.2 分区策略 - 按租户ID分区:确保同一租户的消息在同一分区 - 按实体ID分区:确保同一实体的消息在同一分区 ## 9. 总结 ThingsBoard 的模块交互机制具有以下特点: 1. **异步通信**: 基于消息队列的异步通信,提高系统吞吐量 2. **解耦设计**: 模块间通过消息队列解耦,易于扩展和维护 3. **高可用性**: 支持多实例部署,通过分区服务实现负载均衡 4. **协议统一**: 使用 Protobuf 进行消息序列化,提高性能 5. **灵活扩展**: 支持多种消息队列实现,可根据需求选择 这种设计使得 ThingsBoard 能够处理大规模的物联网设备连接和数据流,同时保持系统的稳定性和可扩展性。