# ThingsBoard 扩展开发指南 ## 1. 概述 本文档介绍如何对 ThingsBoard 进行扩展和二次开发,包括自定义规则节点、传输协议、数据处理等。 ## 2. 扩展点 ### 2.1 规则引擎扩展 #### 2.1.1 自定义规则节点 **位置**: `rule-engine/rule-engine-components/` **步骤**: 1. **创建规则节点类**: ```java @Slf4j @RuleNode( type = ComponentType.ACTION, name = "custom action", configClazz = CustomNodeConfiguration.class, nodeDescription = "自定义动作节点", nodeDetails = "执行自定义动作", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeCustomConfig" ) public class CustomActionNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) { // 初始化节点 } @Override public void onMsg(TbContext ctx, TbMsg msg) { // 处理消息 // 1. 获取配置 CustomNodeConfiguration config = ctx.getNodeConfiguration(); // 2. 处理消息 String data = msg.getData(); // 3. 发送到下一个节点 ctx.tellNext(msg, "Success"); } @Override public void destroy() { // 清理资源 } } ``` 2. **创建配置类**: ```java @Data @NoArgsConstructor @AllArgsConstructor public class CustomNodeConfiguration implements NodeConfiguration { private String customParam; @Override public CustomNodeConfiguration defaultConfiguration() { CustomNodeConfiguration configuration = new CustomNodeConfiguration(); configuration.setCustomParam("default"); return configuration; } } ``` 3. **注册节点**: 在 `resources/META-INF/services/org.thingsboard.rule.engine.RuleNode` 文件中添加: ``` org.thingsboard.rule.engine.action.CustomActionNode ``` #### 2.1.2 自定义规则节点类型 支持的节点类型: - **FILTER**: 过滤节点 - **ENRICHMENT**: 丰富节点 - **TRANSFORMATION**: 转换节点 - **ACTION**: 动作节点 - **EXTERNAL**: 外部节点 ### 2.2 传输协议扩展 #### 2.2.1 添加新的传输协议 **位置**: `common/transport/` **步骤**: 1. **创建传输模块**: ```java @TbTransportComponent @Service public class CustomTransportService implements TransportService { @Override public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { // 处理遥测数据 } @Override public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { // 处理属性数据 } // 实现其他 TransportService 接口方法 } ``` 2. **创建传输处理器**: ```java public class CustomTransportHandler { public void handleConnect(ChannelHandlerContext ctx, Object msg) { // 处理连接 } public void handleMessage(ChannelHandlerContext ctx, Object msg) { // 处理消息 } } ``` 3. **配置传输服务**: 在 `application.yml` 中配置: ```yaml transport: custom: enabled: true bind_address: 0.0.0.0 bind_port: 8888 ``` ### 2.3 数据存储扩展 #### 2.3.1 自定义 DAO **位置**: `dao/src/main/java/org/thingsboard/server/dao/` **步骤**: 1. **创建 DAO 接口**: ```java public interface CustomEntityDao { CustomEntity save(TenantId tenantId, CustomEntity entity); CustomEntity findById(TenantId tenantId, UUID id); void removeById(TenantId tenantId, UUID id); } ``` 2. **实现 DAO**: ```java @Repository @SqlDao public class JpaCustomEntityDao implements CustomEntityDao { @Autowired private CustomEntityRepository repository; @Override public CustomEntity save(TenantId tenantId, CustomEntity entity) { return repository.save(entity); } // 实现其他方法 } ``` 3. **创建 Repository**: ```java public interface CustomEntityRepository extends JpaRepository { Optional findByTenantIdAndId(TenantId tenantId, UUID id); } ``` ### 2.4 REST API 扩展 #### 2.4.1 添加自定义 API **位置**: `application/src/main/java/org/thingsboard/server/controller/` **步骤**: ```java @RestController @RequestMapping("/api/custom") @Slf4j public class CustomController extends BaseController { @Autowired private CustomService customService; @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/{entityId}", method = RequestMethod.GET) @ResponseBody public CustomEntity getCustomEntity(@PathVariable("entityId") String strEntityId) throws ThingsboardException { checkParameter("entityId", strEntityId); try { UUID entityId = UUID.fromString(strEntityId); return customService.findCustomEntityById(getCurrentUser().getTenantId(), entityId); } catch (Exception e) { throw handleException(e); } } @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") @RequestMapping(method = RequestMethod.POST) @ResponseBody public CustomEntity saveCustomEntity(@RequestBody CustomEntity entity) throws ThingsboardException { checkNotNull(entity); try { entity.setTenantId(getCurrentUser().getTenantId()); return customService.saveCustomEntity(entity); } catch (Exception e) { throw handleException(e); } } } ``` ## 3. 构建和部署 ### 3.1 构建项目 ```bash # 编译项目 mvn clean install -DskipTests # 打包 mvn package -DskipTests ``` ### 3.2 部署扩展 #### 3.2.1 单体模式 1. 将编译后的 JAR 文件替换原有的 JAR 2. 重启服务 #### 3.2.2 微服务模式 1. 构建自定义 Docker 镜像 2. 更新 docker-compose.yml 3. 重启服务 ```dockerfile FROM thingsboard/tb-node:latest COPY target/thingsboard-*.jar /usr/share/thingsboard/bin/thingsboard.jar ``` ## 4. 最佳实践 ### 4.1 代码组织 - 遵循 ThingsBoard 的包结构 - 使用 Spring 的依赖注入 - 实现适当的接口 ### 4.2 错误处理 ```java try { // 业务逻辑 } catch (Exception e) { log.error("Error processing message", e); ctx.tellFailure(msg, e); } ``` ### 4.3 日志记录 ```java log.trace("Processing message: {}", msg); log.debug("Configuration: {}", config); log.info("Node initialized"); log.warn("Warning message"); log.error("Error message", exception); ``` ### 4.4 配置管理 - 使用 `@Value` 注解注入配置 - 提供默认配置 - 验证配置参数 ### 4.5 测试 ```java @Test public void testCustomNode() { // 创建测试上下文 TbContext ctx = mock(TbContext.class); // 创建测试消息 TbMsg msg = TbMsg.newMsg() .type(TbMsgType.POST_TELEMETRY_REQUEST) .data("{\"temperature\":25}") .build(); // 测试节点 CustomActionNode node = new CustomActionNode(); node.init(ctx, config); node.onMsg(ctx, msg); // 验证结果 verify(ctx).tellNext(msg, "Success"); } ``` ## 5. 常见扩展场景 ### 5.1 集成外部系统 **示例**: 发送数据到外部 API ```java @Override public void onMsg(TbContext ctx, TbMsg msg) { String data = msg.getData(); // 调用外部 API RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.postForEntity( "https://api.example.com/data", data, String.class ); if (response.getStatusCode().is2xxSuccessful()) { ctx.tellNext(msg, "Success"); } else { ctx.tellFailure(msg, new RuntimeException("API call failed")); } } ``` ### 5.2 数据转换 **示例**: 转换消息格式 ```java @Override public void onMsg(TbContext ctx, TbMsg msg) { JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData()); // 转换数据 ObjectNode newNode = JacksonUtil.newObjectNode(); newNode.put("deviceId", msg.getOriginator().getId().toString()); newNode.put("timestamp", System.currentTimeMillis()); newNode.set("data", jsonNode); // 创建新消息 TbMsg newMsg = TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(newNode)); ctx.tellNext(newMsg, "Success"); } ``` ### 5.3 条件过滤 **示例**: 基于条件过滤消息 ```java @Override public void onMsg(TbContext ctx, TbMsg msg) { JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData()); double temperature = jsonNode.get("temperature").asDouble(); if (temperature > config.getThreshold()) { ctx.tellNext(msg, "High"); } else { ctx.tellNext(msg, "Normal"); } } ``` ## 6. 调试技巧 ### 6.1 本地调试 1. 使用 IDE 调试器 2. 设置断点 3. 查看变量值 ### 6.2 日志调试 ```java log.trace("Message data: {}", msg.getData()); log.debug("Configuration: {}", JacksonUtil.toString(config)); ``` ### 6.3 测试数据 使用 ThingsBoard 的测试工具发送测试数据。 ## 7. 注意事项 1. **版本兼容性**: 确保扩展与 ThingsBoard 版本兼容 2. **线程安全**: 注意多线程环境下的线程安全 3. **资源管理**: 及时释放资源,避免内存泄漏 4. **性能考虑**: 避免阻塞操作,使用异步处理 5. **错误处理**: 妥善处理异常,避免影响其他节点 ## 8. 参考资源 - [ThingsBoard 官方文档](https://thingsboard.io/docs) - [规则引擎文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/) - [ThingsBoard GitHub](https://github.com/thingsboard/thingsboard) ## 9. 总结 通过本文档,您可以: 1. 了解 ThingsBoard 的扩展点 2. 创建自定义规则节点 3. 添加新的传输协议 4. 扩展数据存储 5. 添加 REST API 6. 进行二次开发 遵循最佳实践,可以确保扩展的稳定性和可维护性。