446 lines
9.9 KiB
Markdown
446 lines
9.9 KiB
Markdown
# ThingsBoard 扩展开发指南
|
|
|
|
## 1. 概述
|
|
|
|
本文档介绍如何对 ThingsBoard 进行扩展和二次开发,包括自定义规则节点、传输协议、数据处理等。
|
|
|
|
## 2. 扩展点
|
|
|
|
### 2.1 规则引擎扩展
|
|
|
|
#### 2.1.1 自定义规则节点
|
|
|
|
**位置**: `rule-engine/rule-engine-components/`
|
|
|
|
**步骤**:
|
|
|
|
1. **创建规则节点类**:
|
|
|
|
```java
|
|
@Slf4j
|
|
@RuleNode(
|
|
type = ComponentType.ACTION,
|
|
name = "custom action",
|
|
configClazz = CustomNodeConfiguration.class,
|
|
nodeDescription = "自定义动作节点",
|
|
nodeDetails = "执行自定义动作",
|
|
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
|
configDirective = "tbActionNodeCustomConfig"
|
|
)
|
|
public class CustomActionNode implements TbNode {
|
|
|
|
@Override
|
|
public void init(TbContext ctx, TbNodeConfiguration configuration) {
|
|
// 初始化节点
|
|
}
|
|
|
|
@Override
|
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
|
// 处理消息
|
|
// 1. 获取配置
|
|
CustomNodeConfiguration config = ctx.getNodeConfiguration();
|
|
|
|
// 2. 处理消息
|
|
String data = msg.getData();
|
|
|
|
// 3. 发送到下一个节点
|
|
ctx.tellNext(msg, "Success");
|
|
}
|
|
|
|
@Override
|
|
public void destroy() {
|
|
// 清理资源
|
|
}
|
|
}
|
|
```
|
|
|
|
2. **创建配置类**:
|
|
|
|
```java
|
|
@Data
|
|
@NoArgsConstructor
|
|
@AllArgsConstructor
|
|
public class CustomNodeConfiguration implements NodeConfiguration<CustomNodeConfiguration> {
|
|
|
|
private String customParam;
|
|
|
|
@Override
|
|
public CustomNodeConfiguration defaultConfiguration() {
|
|
CustomNodeConfiguration configuration = new CustomNodeConfiguration();
|
|
configuration.setCustomParam("default");
|
|
return configuration;
|
|
}
|
|
}
|
|
```
|
|
|
|
3. **注册节点**:
|
|
|
|
在 `resources/META-INF/services/org.thingsboard.rule.engine.RuleNode` 文件中添加:
|
|
|
|
```
|
|
org.thingsboard.rule.engine.action.CustomActionNode
|
|
```
|
|
|
|
#### 2.1.2 自定义规则节点类型
|
|
|
|
支持的节点类型:
|
|
- **FILTER**: 过滤节点
|
|
- **ENRICHMENT**: 丰富节点
|
|
- **TRANSFORMATION**: 转换节点
|
|
- **ACTION**: 动作节点
|
|
- **EXTERNAL**: 外部节点
|
|
|
|
### 2.2 传输协议扩展
|
|
|
|
#### 2.2.1 添加新的传输协议
|
|
|
|
**位置**: `common/transport/`
|
|
|
|
**步骤**:
|
|
|
|
1. **创建传输模块**:
|
|
|
|
```java
|
|
@TbTransportComponent
|
|
@Service
|
|
public class CustomTransportService implements TransportService {
|
|
|
|
@Override
|
|
public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg,
|
|
TransportServiceCallback<Void> callback) {
|
|
// 处理遥测数据
|
|
}
|
|
|
|
@Override
|
|
public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg,
|
|
TransportServiceCallback<Void> callback) {
|
|
// 处理属性数据
|
|
}
|
|
|
|
// 实现其他 TransportService 接口方法
|
|
}
|
|
```
|
|
|
|
2. **创建传输处理器**:
|
|
|
|
```java
|
|
public class CustomTransportHandler {
|
|
|
|
public void handleConnect(ChannelHandlerContext ctx, Object msg) {
|
|
// 处理连接
|
|
}
|
|
|
|
public void handleMessage(ChannelHandlerContext ctx, Object msg) {
|
|
// 处理消息
|
|
}
|
|
}
|
|
```
|
|
|
|
3. **配置传输服务**:
|
|
|
|
在 `application.yml` 中配置:
|
|
|
|
```yaml
|
|
transport:
|
|
custom:
|
|
enabled: true
|
|
bind_address: 0.0.0.0
|
|
bind_port: 8888
|
|
```
|
|
|
|
### 2.3 数据存储扩展
|
|
|
|
#### 2.3.1 自定义 DAO
|
|
|
|
**位置**: `dao/src/main/java/org/thingsboard/server/dao/`
|
|
|
|
**步骤**:
|
|
|
|
1. **创建 DAO 接口**:
|
|
|
|
```java
|
|
public interface CustomEntityDao {
|
|
CustomEntity save(TenantId tenantId, CustomEntity entity);
|
|
CustomEntity findById(TenantId tenantId, UUID id);
|
|
void removeById(TenantId tenantId, UUID id);
|
|
}
|
|
```
|
|
|
|
2. **实现 DAO**:
|
|
|
|
```java
|
|
@Repository
|
|
@SqlDao
|
|
public class JpaCustomEntityDao implements CustomEntityDao {
|
|
|
|
@Autowired
|
|
private CustomEntityRepository repository;
|
|
|
|
@Override
|
|
public CustomEntity save(TenantId tenantId, CustomEntity entity) {
|
|
return repository.save(entity);
|
|
}
|
|
|
|
// 实现其他方法
|
|
}
|
|
```
|
|
|
|
3. **创建 Repository**:
|
|
|
|
```java
|
|
public interface CustomEntityRepository extends JpaRepository<CustomEntity, UUID> {
|
|
Optional<CustomEntity> findByTenantIdAndId(TenantId tenantId, UUID id);
|
|
}
|
|
```
|
|
|
|
### 2.4 REST API 扩展
|
|
|
|
#### 2.4.1 添加自定义 API
|
|
|
|
**位置**: `application/src/main/java/org/thingsboard/server/controller/`
|
|
|
|
**步骤**:
|
|
|
|
```java
|
|
@RestController
|
|
@RequestMapping("/api/custom")
|
|
@Slf4j
|
|
public class CustomController extends BaseController {
|
|
|
|
@Autowired
|
|
private CustomService customService;
|
|
|
|
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
|
|
@RequestMapping(value = "/{entityId}", method = RequestMethod.GET)
|
|
@ResponseBody
|
|
public CustomEntity getCustomEntity(@PathVariable("entityId") String strEntityId)
|
|
throws ThingsboardException {
|
|
checkParameter("entityId", strEntityId);
|
|
try {
|
|
UUID entityId = UUID.fromString(strEntityId);
|
|
return customService.findCustomEntityById(getCurrentUser().getTenantId(), entityId);
|
|
} catch (Exception e) {
|
|
throw handleException(e);
|
|
}
|
|
}
|
|
|
|
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
|
@RequestMapping(method = RequestMethod.POST)
|
|
@ResponseBody
|
|
public CustomEntity saveCustomEntity(@RequestBody CustomEntity entity)
|
|
throws ThingsboardException {
|
|
checkNotNull(entity);
|
|
try {
|
|
entity.setTenantId(getCurrentUser().getTenantId());
|
|
return customService.saveCustomEntity(entity);
|
|
} catch (Exception e) {
|
|
throw handleException(e);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## 3. 构建和部署
|
|
|
|
### 3.1 构建项目
|
|
|
|
```bash
|
|
# 编译项目
|
|
mvn clean install -DskipTests
|
|
|
|
# 打包
|
|
mvn package -DskipTests
|
|
```
|
|
|
|
### 3.2 部署扩展
|
|
|
|
#### 3.2.1 单体模式
|
|
|
|
1. 将编译后的 JAR 文件替换原有的 JAR
|
|
2. 重启服务
|
|
|
|
#### 3.2.2 微服务模式
|
|
|
|
1. 构建自定义 Docker 镜像
|
|
2. 更新 docker-compose.yml
|
|
3. 重启服务
|
|
|
|
```dockerfile
|
|
FROM thingsboard/tb-node:latest
|
|
COPY target/thingsboard-*.jar /usr/share/thingsboard/bin/thingsboard.jar
|
|
```
|
|
|
|
## 4. 最佳实践
|
|
|
|
### 4.1 代码组织
|
|
|
|
- 遵循 ThingsBoard 的包结构
|
|
- 使用 Spring 的依赖注入
|
|
- 实现适当的接口
|
|
|
|
### 4.2 错误处理
|
|
|
|
```java
|
|
try {
|
|
// 业务逻辑
|
|
} catch (Exception e) {
|
|
log.error("Error processing message", e);
|
|
ctx.tellFailure(msg, e);
|
|
}
|
|
```
|
|
|
|
### 4.3 日志记录
|
|
|
|
```java
|
|
log.trace("Processing message: {}", msg);
|
|
log.debug("Configuration: {}", config);
|
|
log.info("Node initialized");
|
|
log.warn("Warning message");
|
|
log.error("Error message", exception);
|
|
```
|
|
|
|
### 4.4 配置管理
|
|
|
|
- 使用 `@Value` 注解注入配置
|
|
- 提供默认配置
|
|
- 验证配置参数
|
|
|
|
### 4.5 测试
|
|
|
|
```java
|
|
@Test
|
|
public void testCustomNode() {
|
|
// 创建测试上下文
|
|
TbContext ctx = mock(TbContext.class);
|
|
|
|
// 创建测试消息
|
|
TbMsg msg = TbMsg.newMsg()
|
|
.type(TbMsgType.POST_TELEMETRY_REQUEST)
|
|
.data("{\"temperature\":25}")
|
|
.build();
|
|
|
|
// 测试节点
|
|
CustomActionNode node = new CustomActionNode();
|
|
node.init(ctx, config);
|
|
node.onMsg(ctx, msg);
|
|
|
|
// 验证结果
|
|
verify(ctx).tellNext(msg, "Success");
|
|
}
|
|
```
|
|
|
|
## 5. 常见扩展场景
|
|
|
|
### 5.1 集成外部系统
|
|
|
|
**示例**: 发送数据到外部 API
|
|
|
|
```java
|
|
@Override
|
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
|
String data = msg.getData();
|
|
|
|
// 调用外部 API
|
|
RestTemplate restTemplate = new RestTemplate();
|
|
ResponseEntity<String> response = restTemplate.postForEntity(
|
|
"https://api.example.com/data",
|
|
data,
|
|
String.class
|
|
);
|
|
|
|
if (response.getStatusCode().is2xxSuccessful()) {
|
|
ctx.tellNext(msg, "Success");
|
|
} else {
|
|
ctx.tellFailure(msg, new RuntimeException("API call failed"));
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5.2 数据转换
|
|
|
|
**示例**: 转换消息格式
|
|
|
|
```java
|
|
@Override
|
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
|
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
|
|
|
|
// 转换数据
|
|
ObjectNode newNode = JacksonUtil.newObjectNode();
|
|
newNode.put("deviceId", msg.getOriginator().getId().toString());
|
|
newNode.put("timestamp", System.currentTimeMillis());
|
|
newNode.set("data", jsonNode);
|
|
|
|
// 创建新消息
|
|
TbMsg newMsg = TbMsg.transformMsg(msg, msg.getType(),
|
|
msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(newNode));
|
|
|
|
ctx.tellNext(newMsg, "Success");
|
|
}
|
|
```
|
|
|
|
### 5.3 条件过滤
|
|
|
|
**示例**: 基于条件过滤消息
|
|
|
|
```java
|
|
@Override
|
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
|
JsonNode jsonNode = JacksonUtil.toJsonNode(msg.getData());
|
|
double temperature = jsonNode.get("temperature").asDouble();
|
|
|
|
if (temperature > config.getThreshold()) {
|
|
ctx.tellNext(msg, "High");
|
|
} else {
|
|
ctx.tellNext(msg, "Normal");
|
|
}
|
|
}
|
|
```
|
|
|
|
## 6. 调试技巧
|
|
|
|
### 6.1 本地调试
|
|
|
|
1. 使用 IDE 调试器
|
|
2. 设置断点
|
|
3. 查看变量值
|
|
|
|
### 6.2 日志调试
|
|
|
|
```java
|
|
log.trace("Message data: {}", msg.getData());
|
|
log.debug("Configuration: {}", JacksonUtil.toString(config));
|
|
```
|
|
|
|
### 6.3 测试数据
|
|
|
|
使用 ThingsBoard 的测试工具发送测试数据。
|
|
|
|
## 7. 注意事项
|
|
|
|
1. **版本兼容性**: 确保扩展与 ThingsBoard 版本兼容
|
|
2. **线程安全**: 注意多线程环境下的线程安全
|
|
3. **资源管理**: 及时释放资源,避免内存泄漏
|
|
4. **性能考虑**: 避免阻塞操作,使用异步处理
|
|
5. **错误处理**: 妥善处理异常,避免影响其他节点
|
|
|
|
## 8. 参考资源
|
|
|
|
- [ThingsBoard 官方文档](https://thingsboard.io/docs)
|
|
- [规则引擎文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/re-getting-started/)
|
|
- [ThingsBoard GitHub](https://github.com/thingsboard/thingsboard)
|
|
|
|
## 9. 总结
|
|
|
|
通过本文档,您可以:
|
|
|
|
1. 了解 ThingsBoard 的扩展点
|
|
2. 创建自定义规则节点
|
|
3. 添加新的传输协议
|
|
4. 扩展数据存储
|
|
5. 添加 REST API
|
|
6. 进行二次开发
|
|
|
|
遵循最佳实践,可以确保扩展的稳定性和可维护性。
|
|
|