thingsboard/summary/05-扩展开发指南.md

446 lines
9.9 KiB
Markdown
Raw Permalink Normal View History

2026-01-19 11:50:37 +08:00
# ThingsBoard 扩展开发指南
## 1. 概述
本文档介绍如何对 ThingsBoard 进行扩展和二次开发,包括自定义规则节点、传输协议、数据处理等。
## 2. 扩展点
### 2.1 规则引擎扩展
#### 2.1.1 自定义规则节点
**位置**: `rule-engine/rule-engine-components/`
**步骤**:
1. **创建规则节点类**:
```java
@Slf4j
@RuleNode(
type = ComponentType.ACTION,
name = "custom action",
configClazz = CustomNodeConfiguration.class,
nodeDescription = "自定义动作节点",
nodeDetails = "执行自定义动作",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeCustomConfig"
)
public class CustomActionNode implements TbNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) {
// 初始化节点
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
// 处理消息
// 1. 获取配置
CustomNodeConfiguration config = ctx.getNodeConfiguration();
// 2. 处理消息
String data = msg.getData();
// 3. 发送到下一个节点
ctx.tellNext(msg, "Success");
}
@Override
public void destroy() {
// 清理资源
}
}
```
2. **创建配置类**:
```java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CustomNodeConfiguration implements NodeConfiguration<CustomNodeConfiguration> {
private String customParam;
@Override
public CustomNodeConfiguration defaultConfiguration() {
CustomNodeConfiguration configuration = new CustomNodeConfiguration();
configuration.setCustomParam("default");
return configuration;
}
}
```
3. **注册节点**:
`resources/META-INF/services/org.thingsboard.rule.engine.RuleNode` 文件中添加:
```
org.thingsboard.rule.engine.action.CustomActionNode
```
#### 2.1.2 自定义规则节点类型
支持的节点类型:
- **FILTER**: 过滤节点
- **ENRICHMENT**: 丰富节点
- **TRANSFORMATION**: 转换节点
- **ACTION**: 动作节点
- **EXTERNAL**: 外部节点
### 2.2 传输协议扩展
#### 2.2.1 添加新的传输协议
**位置**: `common/transport/`
**步骤**:
1. **创建传输模块**:
```java
@TbTransportComponent
@Service
public class CustomTransportService implements TransportService {
@Override
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg,
TransportServiceCallback<Void> callback) {
// 处理遥测数据
}
@Override
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg,
TransportServiceCallback<Void> callback) {
// 处理属性数据
}
// 实现其他 TransportService 接口方法
}
```
2. **创建传输处理器**:
```java
public class CustomTransportHandler {
public void handleConnect(ChannelHandlerContext ctx, Object msg) {
// 处理连接
}
public void handleMessage(ChannelHandlerContext ctx, Object msg) {
// 处理消息
}
}
```
3. **配置传输服务**:
`application.yml` 中配置:
```yaml
transport:
custom:
enabled: true
bind_address: 0.0.0.0
bind_port: 8888
```
### 2.3 数据存储扩展
#### 2.3.1 自定义 DAO
**位置**: `dao/src/main/java/org/thingsboard/server/dao/`
**步骤**:
1. **创建 DAO 接口**:
```java
public interface CustomEntityDao {
CustomEntity save(TenantId tenantId, CustomEntity entity);
CustomEntity findById(TenantId tenantId, UUID id);
void removeById(TenantId tenantId, UUID id);
}
```
2. **实现 DAO**:
```java
@Repository
@SqlDao
public class JpaCustomEntityDao implements CustomEntityDao {
@Autowired
private CustomEntityRepository repository;
@Override
public CustomEntity save(TenantId tenantId, CustomEntity entity) {
return repository.save(entity);
}
// 实现其他方法
}
```
3. **创建 Repository**:
```java
public interface CustomEntityRepository extends JpaRepository<CustomEntity, UUID> {
Optional<CustomEntity> findByTenantIdAndId(TenantId tenantId, UUID id);
}
```
### 2.4 REST API 扩展
#### 2.4.1 添加自定义 API
**位置**: `application/src/main/java/org/thingsboard/server/controller/`
**步骤**:
```java
@RestController
@RequestMapping("/api/custom")
@Slf4j
public class CustomController extends BaseController {
@Autowired
private CustomService customService;
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityId}", method = RequestMethod.GET)
@ResponseBody
public CustomEntity getCustomEntity(@PathVariable("entityId") String strEntityId)
throws ThingsboardException {
checkParameter("entityId", strEntityId);
try {
UUID entityId = UUID.fromString(strEntityId);
return customService.findCustomEntityById(getCurrentUser().getTenantId(), entityId);
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public CustomEntity saveCustomEntity(@RequestBody CustomEntity entity)
throws ThingsboardException {
checkNotNull(entity);
try {
entity.setTenantId(getCurrentUser().getTenantId());
return customService.saveCustomEntity(entity);
} catch (Exception e) {
throw handleException(e);
}
}
}
```
## 3. 构建和部署
### 3.1 构建项目
```bash
# 编译项目
mvn clean install -DskipTests
# 打包
mvn package -DskipTests
```
### 3.2 部署扩展
#### 3.2.1 单体模式
1. 将编译后的 JAR 文件替换原有的 JAR
2. 重启服务
#### 3.2.2 微服务模式
1. 构建自定义 Docker 镜像
2. 更新 docker-compose.yml
3. 重启服务
```dockerfile
FROM thingsboard/tb-node:latest
COPY target/thingsboard-*.jar /usr/share/thingsboard/bin/thingsboard.jar
```
## 4. 最佳实践
### 4.1 代码组织
- 遵循 ThingsBoard 的包结构
- 使用 Spring 的依赖注入
- 实现适当的接口
### 4.2 错误处理
```java
try {
// 业务逻辑
} catch (Exception e) {
log.error("Error processing message", e);
ctx.tellFailure(msg, e);
}
```
### 4.3 日志记录
```java
log.trace("Processing message: {}", msg);
log.debug("Configuration: {}", config);
log.info("Node initialized");
log.warn("Warning message");
log.error("Error message", exception);
```
### 4.4 配置管理
- 使用 `@Value` 注解注入配置
- 提供默认配置
- 验证配置参数
### 4.5 测试
```java
@Test
public void testCustomNode() {
// 创建测试上下文
TbContext ctx = mock(TbContext.class);
// 创建测试消息
TbMsg msg = TbMsg.newMsg()
.type(TbMsgType.POST_TELEMETRY_REQUEST)
.data("{\"temperature\":25}")
.build();
// 测试节点
CustomActionNode node = new CustomActionNode();
node.init(ctx, config);
node.onMsg(ctx, msg);
// 验证结果
verify(ctx).tellNext(msg, "Success");
}
```
## 5. 常见扩展场景
### 5.1 集成外部系统
**示例**: 发送数据到外部 API
```java
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String data = msg.getData();
// 调用外部 API
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<String> response = restTemplate.postForEntity(
"https://api.example.com/data",
data,
String.class
);
if (response.getStatusCode().is2xxSuccessful()) {
ctx.tellNext(msg, "Success");
} else {
ctx.tellFailure(msg, new RuntimeException("API call failed"));
}
}
```
### 5.2 数据转换
**示例**: 转换消息格式
```java
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
// 转换数据
ObjectNode newNode = JacksonUtil.newObjectNode();
newNode.put("deviceId", msg.getOriginator().getId().toString());
newNode.put("timestamp", System.currentTimeMillis());
newNode.set("data", jsonNode);
// 创建新消息
TbMsg newMsg = TbMsg.transformMsg(msg, msg.getType(),
msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(newNode));
ctx.tellNext(newMsg, "Success");
}
```
### 5.3 条件过滤
**示例**: 基于条件过滤消息
```java
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
double temperature = jsonNode.get("temperature").asDouble();
if (temperature > config.getThreshold()) {
ctx.tellNext(msg, "High");
} else {
ctx.tellNext(msg, "Normal");
}
}
```
## 6. 调试技巧
### 6.1 本地调试
1. 使用 IDE 调试器
2. 设置断点
3. 查看变量值
### 6.2 日志调试
```java
log.trace("Message data: {}", msg.getData());
log.debug("Configuration: {}", JacksonUtil.toString(config));
```
### 6.3 测试数据
使用 ThingsBoard 的测试工具发送测试数据。
## 7. 注意事项
1. **版本兼容性**: 确保扩展与 ThingsBoard 版本兼容
2. **线程安全**: 注意多线程环境下的线程安全
3. **资源管理**: 及时释放资源,避免内存泄漏
4. **性能考虑**: 避免阻塞操作,使用异步处理
5. **错误处理**: 妥善处理异常,避免影响其他节点
## 8. 参考资源
- [ThingsBoard 官方文档](https://thingsboard.io/docs)
- [规则引擎文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/)
- [ThingsBoard GitHub](https://github.com/thingsboard/thingsboard)
## 9. 总结
通过本文档,您可以:
1. 了解 ThingsBoard 的扩展点
2. 创建自定义规则节点
3. 添加新的传输协议
4. 扩展数据存储
5. 添加 REST API
6. 进行二次开发
遵循最佳实践,可以确保扩展的稳定性和可维护性。