9.9 KiB
9.9 KiB
ThingsBoard 扩展开发指南
1. 概述
本文档介绍如何对 ThingsBoard 进行扩展和二次开发,包括自定义规则节点、传输协议、数据处理等。
2. 扩展点
2.1 规则引擎扩展
2.1.1 自定义规则节点
位置: rule-engine/rule-engine-components/
步骤:
- 创建规则节点类:
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "custom action",
configClazz = CustomNodeConfiguration.class,
nodeDescription = "自定义动作节点",
nodeDetails = "执行自定义动作",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeCustomConfig"
)
public class CustomActionNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) {
// 初始化节点
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
// 处理消息
// 1. 获取配置
CustomNodeConfiguration config = ctx.getNodeConfiguration();
// 2. 处理消息
String data = msg.getData();
// 3. 发送到下一个节点
ctx.tellNext(msg, "Success");
}
@Override
public void destroy() {
// 清理资源
}
}
- 创建配置类:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CustomNodeConfiguration implements NodeConfiguration<CustomNodeConfiguration> {
private String customParam;
@Override
public CustomNodeConfiguration defaultConfiguration() {
CustomNodeConfiguration configuration = new CustomNodeConfiguration();
configuration.setCustomParam("default");
return configuration;
}
}
- 注册节点:
在 resources/META-INF/services/org.thingsboard.rule.engine.RuleNode 文件中添加:
org.thingsboard.rule.engine.action.CustomActionNode
2.1.2 自定义规则节点类型
支持的节点类型:
- FILTER: 过滤节点
- ENRICHMENT: 丰富节点
- TRANSFORMATION: 转换节点
- ACTION: 动作节点
- EXTERNAL: 外部节点
2.2 传输协议扩展
2.2.1 添加新的传输协议
位置: common/transport/
步骤:
- 创建传输模块:
@TbTransportComponent
@Service
public class CustomTransportService implements TransportService {
@Override
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg,
TransportServiceCallback<Void> callback) {
// 处理遥测数据
}
@Override
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg,
TransportServiceCallback<Void> callback) {
// 处理属性数据
}
// 实现其他 TransportService 接口方法
}
- 创建传输处理器:
public class CustomTransportHandler {
public void handleConnect(ChannelHandlerContext ctx, Object msg) {
// 处理连接
}
public void handleMessage(ChannelHandlerContext ctx, Object msg) {
// 处理消息
}
}
- 配置传输服务:
在 application.yml 中配置:
transport:
custom:
enabled: true
bind_address: 0.0.0.0
bind_port: 8888
2.3 数据存储扩展
2.3.1 自定义 DAO
位置: dao/src/main/java/org/thingsboard/server/dao/
步骤:
- 创建 DAO 接口:
public interface CustomEntityDao {
CustomEntity save(TenantId tenantId, CustomEntity entity);
CustomEntity findById(TenantId tenantId, UUID id);
void removeById(TenantId tenantId, UUID id);
}
- 实现 DAO:
@Repository
@SqlDao
public class JpaCustomEntityDao implements CustomEntityDao {
@Autowired
private CustomEntityRepository repository;
@Override
public CustomEntity save(TenantId tenantId, CustomEntity entity) {
return repository.save(entity);
}
// 实现其他方法
}
- 创建 Repository:
public interface CustomEntityRepository extends JpaRepository<CustomEntity, UUID> {
Optional<CustomEntity> findByTenantIdAndId(TenantId tenantId, UUID id);
}
2.4 REST API 扩展
2.4.1 添加自定义 API
位置: application/src/main/java/org/thingsboard/server/controller/
步骤:
@RestController
@RequestMapping("/api/custom")
@Slf4j
public class CustomController extends BaseController {
@Autowired
private CustomService customService;
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityId}", method = RequestMethod.GET)
@ResponseBody
public CustomEntity getCustomEntity(@PathVariable("entityId") String strEntityId)
throws ThingsboardException {
checkParameter("entityId", strEntityId);
try {
UUID entityId = UUID.fromString(strEntityId);
return customService.findCustomEntityById(getCurrentUser().getTenantId(), entityId);
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public CustomEntity saveCustomEntity(@RequestBody CustomEntity entity)
throws ThingsboardException {
checkNotNull(entity);
try {
entity.setTenantId(getCurrentUser().getTenantId());
return customService.saveCustomEntity(entity);
} catch (Exception e) {
throw handleException(e);
}
}
}
3. 构建和部署
3.1 构建项目
# 编译项目
mvn clean install -DskipTests
# 打包
mvn package -DskipTests
3.2 部署扩展
3.2.1 单体模式
- 将编译后的 JAR 文件替换原有的 JAR
- 重启服务
3.2.2 微服务模式
- 构建自定义 Docker 镜像
- 更新 docker-compose.yml
- 重启服务
FROM thingsboard/tb-node:latest
COPY target/thingsboard-*.jar /usr/share/thingsboard/bin/thingsboard.jar
4. 最佳实践
4.1 代码组织
- 遵循 ThingsBoard 的包结构
- 使用 Spring 的依赖注入
- 实现适当的接口
4.2 错误处理
try {
// 业务逻辑
} catch (Exception e) {
log.error("Error processing message", e);
ctx.tellFailure(msg, e);
}
4.3 日志记录
log.trace("Processing message: {}", msg);
log.debug("Configuration: {}", config);
log.info("Node initialized");
log.warn("Warning message");
log.error("Error message", exception);
4.4 配置管理
- 使用
@Value注解注入配置 - 提供默认配置
- 验证配置参数
4.5 测试
@Test
public void testCustomNode() {
// 创建测试上下文
TbContext ctx = mock(TbContext.class);
// 创建测试消息
TbMsg msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.data("{\"temperature\":25}")
.build();
// 测试节点
CustomActionNode node = new CustomActionNode();
node.init(ctx, config);
node.onMsg(ctx, msg);
// 验证结果
verify(ctx).tellNext(msg, "Success");
}
5. 常见扩展场景
5.1 集成外部系统
示例: 发送数据到外部 API
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String data = msg.getData();
// 调用外部 API
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<String> response = restTemplate.postForEntity(
"https://api.example.com/data",
data,
String.class
);
if (response.getStatusCode().is2xxSuccessful()) {
ctx.tellNext(msg, "Success");
} else {
ctx.tellFailure(msg, new RuntimeException("API call failed"));
}
}
5.2 数据转换
示例: 转换消息格式
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
// 转换数据
ObjectNode newNode = JacksonUtil.newObjectNode();
newNode.put("deviceId", msg.getOriginator().getId().toString());
newNode.put("timestamp", System.currentTimeMillis());
newNode.set("data", jsonNode);
// 创建新消息
TbMsg newMsg = TbMsg.transformMsg(msg, msg.getType(),
msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(newNode));
ctx.tellNext(newMsg, "Success");
}
5.3 条件过滤
示例: 基于条件过滤消息
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
double temperature = jsonNode.get("temperature").asDouble();
if (temperature > config.getThreshold()) {
ctx.tellNext(msg, "High");
} else {
ctx.tellNext(msg, "Normal");
}
}
6. 调试技巧
6.1 本地调试
- 使用 IDE 调试器
- 设置断点
- 查看变量值
6.2 日志调试
log.trace("Message data: {}", msg.getData());
log.debug("Configuration: {}", JacksonUtil.toString(config));
6.3 测试数据
使用 ThingsBoard 的测试工具发送测试数据。
7. 注意事项
- 版本兼容性: 确保扩展与 ThingsBoard 版本兼容
- 线程安全: 注意多线程环境下的线程安全
- 资源管理: 及时释放资源,避免内存泄漏
- 性能考虑: 避免阻塞操作,使用异步处理
- 错误处理: 妥善处理异常,避免影响其他节点
8. 参考资源
9. 总结
通过本文档,您可以:
- 了解 ThingsBoard 的扩展点
- 创建自定义规则节点
- 添加新的传输协议
- 扩展数据存储
- 添加 REST API
- 进行二次开发
遵循最佳实践,可以确保扩展的稳定性和可维护性。