424 lines
13 KiB
Markdown
424 lines
13 KiB
Markdown
# ThingsBoard 社区版 MQTT 转发功能启用指南
|
||
|
||
## 问题分析
|
||
|
||
根据源码分析,`TbMqttNode` 类本身**没有许可证检查**,代码逻辑上应该可以在社区版使用。但如果遇到无法使用的情况,可能是以下原因:
|
||
|
||
1. **UI 层面过滤**:前端可能隐藏了某些节点
|
||
2. **版本限制**:某些版本可能有限制
|
||
3. **配置问题**:节点配置不正确
|
||
|
||
## 源码分析
|
||
|
||
### 1. MQTT 节点实现
|
||
|
||
**位置**:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
|
||
|
||
**关键代码**:
|
||
```java
|
||
@RuleNode(
|
||
type = ComponentType.EXTERNAL,
|
||
name = "mqtt",
|
||
configClazz = TbMqttNodeConfiguration.class,
|
||
version = 2,
|
||
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
||
nodeDescription = "Publish messages to the MQTT broker",
|
||
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
|
||
configDirective = "tbExternalNodeMqttConfig",
|
||
icon = "call_split",
|
||
docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/"
|
||
)
|
||
public class TbMqttNode extends TbAbstractExternalNode {
|
||
// 没有许可证检查,直接实现功能
|
||
}
|
||
```
|
||
|
||
**结论**:代码层面**没有许可证限制**,应该可以正常使用。
|
||
|
||
## 解决方案
|
||
|
||
### 方案 1:验证节点是否可用(推荐先尝试)
|
||
|
||
1. **检查节点是否已注册**
|
||
- 登录 ThingsBoard Web UI
|
||
- 进入规则链编辑器
|
||
- 尝试添加 "External" → "MQTT" 节点
|
||
- 如果能看到,说明节点可用
|
||
|
||
2. **如果节点不可见,检查日志**
|
||
```bash
|
||
# 查看 ThingsBoard 启动日志
|
||
docker logs mytb | grep -i "mqtt"
|
||
# 或
|
||
tail -f /var/log/thingsboard/thingsboard.log | grep -i "rule.*node\|mqtt"
|
||
```
|
||
|
||
### 方案 2:修改代码确保功能可用(如果确实不可用)
|
||
|
||
#### 步骤 1:确认节点类存在
|
||
|
||
检查文件是否存在:
|
||
```bash
|
||
ls -la thingsboard/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
|
||
```
|
||
|
||
#### 步骤 2:检查组件扫描配置
|
||
|
||
**文件**:`application/src/main/resources/thingsboard.yml`
|
||
|
||
查找配置:
|
||
```yaml
|
||
plugins:
|
||
scan_packages:
|
||
- "org.thingsboard.rule.engine"
|
||
```
|
||
|
||
确保 `org.thingsboard.rule.engine.mqtt` 在扫描路径内。
|
||
|
||
#### 步骤 3:检查组件注册
|
||
|
||
**文件**:`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
|
||
|
||
关键方法:`registerRuleNodeComponents()`(约第 121 行)
|
||
|
||
```java
|
||
private void registerRuleNodeComponents() {
|
||
for (RuleNodeClassInfo def : ruleNodeClasses.values()) {
|
||
// ... 注册逻辑
|
||
ComponentDescriptor component = scanAndPersistComponent(def, type);
|
||
components.put(component.getClazz(), component);
|
||
putComponentIntoMaps(type, def.getAnnotation(), component);
|
||
}
|
||
}
|
||
```
|
||
|
||
**验证**:确保 `TbMqttNode` 类能被扫描到。
|
||
|
||
#### 步骤 4:如果 UI 层面有过滤,修改前端代码
|
||
|
||
**可能的位置**:
|
||
- `ui-ngx/src/app/modules/home/pages/rulechain/rulechain-page.component.ts`
|
||
- `ui-ngx/src/app/modules/home/components/rule-node/`
|
||
|
||
**查找过滤逻辑**:
|
||
```typescript
|
||
// 查找是否有类似这样的过滤
|
||
if (nodeType === 'mqtt' && !isProfessionalEdition) {
|
||
return false; // 过滤掉
|
||
}
|
||
```
|
||
|
||
**修改方法**:移除或注释掉相关过滤逻辑。
|
||
|
||
### 方案 3:创建自定义 MQTT 转发节点(如果原节点不可用)
|
||
|
||
如果确实无法使用官方节点,可以创建一个自定义节点:
|
||
|
||
#### 步骤 1:创建自定义 MQTT 节点类
|
||
|
||
**新建文件**:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/CustomMqttForwardNode.java`
|
||
|
||
```java
|
||
package org.thingsboard.rule.engine.mqtt;
|
||
|
||
import io.netty.buffer.Unpooled;
|
||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import org.thingsboard.mqtt.MqttClient;
|
||
import org.thingsboard.mqtt.MqttClientConfig;
|
||
import org.thingsboard.rule.engine.api.RuleNode;
|
||
import org.thingsboard.rule.engine.api.TbContext;
|
||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
||
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||
import org.thingsboard.server.common.msg.TbMsg;
|
||
|
||
import java.nio.charset.StandardCharsets;
|
||
import java.util.concurrent.TimeUnit;
|
||
import java.util.concurrent.TimeoutException;
|
||
|
||
@Slf4j
|
||
@RuleNode(
|
||
type = ComponentType.EXTERNAL,
|
||
name = "custom_mqtt_forward",
|
||
configClazz = TbMqttNodeConfiguration.class,
|
||
version = 1,
|
||
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
||
nodeDescription = "Custom MQTT Forward Node - Publish messages to external MQTT broker",
|
||
nodeDetails = "Will publish message payload to the external MQTT broker with QoS AT_LEAST_ONCE.",
|
||
configDirective = "tbExternalNodeMqttConfig",
|
||
icon = "call_split"
|
||
)
|
||
public class CustomMqttForwardNode extends TbAbstractExternalNode {
|
||
|
||
private TbMqttNodeConfiguration config;
|
||
private MqttClient mqttClient;
|
||
|
||
@Override
|
||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||
super.init(ctx);
|
||
this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
|
||
try {
|
||
this.mqttClient = initMqttClient(ctx);
|
||
log.info("[{}] Custom MQTT Forward Node initialized, connecting to {}:{}",
|
||
ctx.getSelfId(), config.getHost(), config.getPort());
|
||
} catch (Exception e) {
|
||
log.error("[{}] Failed to initialize MQTT client", ctx.getSelfId(), e);
|
||
throw new TbNodeException("Failed to initialize MQTT client: " + e.getMessage(), e);
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||
try {
|
||
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
|
||
String payload = msg.getData();
|
||
|
||
log.debug("[{}] Publishing message to topic: {}, payload: {}",
|
||
ctx.getSelfId(), topic, payload);
|
||
|
||
var tbMsg = ackIfNeeded(ctx, msg);
|
||
|
||
this.mqttClient.publish(
|
||
topic,
|
||
Unpooled.wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)),
|
||
MqttQoS.AT_LEAST_ONCE,
|
||
config.isRetainedMessage()
|
||
).addListener(future -> {
|
||
if (future.isSuccess()) {
|
||
log.debug("[{}] Successfully published to MQTT broker", ctx.getSelfId());
|
||
tellSuccess(ctx, tbMsg);
|
||
} else {
|
||
log.error("[{}] Failed to publish to MQTT broker", ctx.getSelfId(), future.cause());
|
||
tellFailure(ctx, tbMsg, future.cause());
|
||
}
|
||
});
|
||
} catch (Exception e) {
|
||
log.error("[{}] Error processing message", ctx.getSelfId(), e);
|
||
ctx.tellFailure(msg, e);
|
||
}
|
||
}
|
||
|
||
@Override
|
||
public void destroy() {
|
||
if (mqttClient != null) {
|
||
try {
|
||
mqttClient.disconnect();
|
||
log.info("MQTT client disconnected");
|
||
} catch (Exception e) {
|
||
log.error("Error disconnecting MQTT client", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
private MqttClient initMqttClient(TbContext ctx) throws Exception {
|
||
MqttClientConfig mqttConfig = new MqttClientConfig();
|
||
|
||
// 设置客户端 ID
|
||
String clientId = config.getClientId();
|
||
if (clientId == null || clientId.isEmpty()) {
|
||
clientId = "tb-forwarder-" + ctx.getSelfId().getId();
|
||
}
|
||
if (config.isAppendClientIdSuffix()) {
|
||
clientId = clientId + "_" + ctx.getServiceId();
|
||
}
|
||
mqttConfig.setClientId(clientId);
|
||
|
||
// 设置其他配置
|
||
mqttConfig.setCleanSession(config.isCleanSession());
|
||
mqttConfig.setProtocolVersion(config.getProtocolVersion());
|
||
|
||
// 设置认证(如果需要)
|
||
if (config.getCredentials() != null) {
|
||
var credentials = config.getCredentials();
|
||
if (credentials.getType() == org.thingsboard.rule.engine.credentials.CredentialsType.BASIC) {
|
||
var basicCreds = (org.thingsboard.rule.engine.credentials.BasicCredentials) credentials;
|
||
mqttConfig.setUsername(basicCreds.getUsername());
|
||
mqttConfig.setPassword(basicCreds.getPassword());
|
||
}
|
||
}
|
||
|
||
// 设置 SSL(如果需要)
|
||
if (config.isSsl()) {
|
||
mqttConfig.setSslContext(config.getCredentials().initSslContext());
|
||
}
|
||
|
||
// 创建客户端
|
||
MqttClient client = MqttClient.create(mqttConfig, null, ctx.getExternalCallExecutor());
|
||
client.setEventLoop(ctx.getSharedEventLoop());
|
||
|
||
// 连接
|
||
var connectFuture = client.connect(config.getHost(), config.getPort());
|
||
var result = connectFuture.get(config.getConnectTimeoutSec(), TimeUnit.SECONDS);
|
||
|
||
if (!result.isSuccess()) {
|
||
throw new RuntimeException("Failed to connect to MQTT broker: " + result.getReturnCode());
|
||
}
|
||
|
||
return client;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 步骤 2:重新编译项目
|
||
|
||
```bash
|
||
cd /Users/sunpeng/workspace/things/thingsboard
|
||
|
||
# 编译规则引擎模块
|
||
mvn clean install -pl rule-engine/rule-engine-components -am -DskipTests
|
||
|
||
# 或者编译整个项目
|
||
mvn clean install -DskipTests
|
||
```
|
||
|
||
#### 步骤 3:重新打包和部署
|
||
|
||
```bash
|
||
# 重新构建 Docker 镜像
|
||
cd msa/tb
|
||
mvn clean install -DskipTests -Ddockerfile.skip=false
|
||
```
|
||
|
||
### 方案 4:使用规则引擎脚本节点实现转发(无需修改代码)
|
||
|
||
如果不想修改代码,可以使用 JavaScript 脚本节点 + HTTP 节点实现:
|
||
|
||
#### 步骤 1:在规则链中添加脚本节点
|
||
|
||
**脚本内容**:
|
||
```javascript
|
||
// 将遥测数据转换为 HTTP 请求格式
|
||
var data = JSON.parse(msg.data);
|
||
var metadata = msg.metadata;
|
||
|
||
// 构建要发送的数据
|
||
var payload = {
|
||
deviceName: metadata.deviceName,
|
||
deviceType: metadata.deviceType,
|
||
timestamp: metadata.ts,
|
||
telemetry: data
|
||
};
|
||
|
||
// 更新消息数据
|
||
return {
|
||
msg: JSON.stringify(payload),
|
||
metadata: metadata,
|
||
msgType: msgType
|
||
};
|
||
```
|
||
|
||
#### 步骤 2:添加 REST API Call 节点
|
||
|
||
配置你的外部服务 HTTP 接口,将数据转发出去。
|
||
|
||
**注意**:这种方式不如 MQTT 实时,但无需修改代码。
|
||
|
||
## 验证方法
|
||
|
||
### 1. 检查节点是否注册成功
|
||
|
||
**查看日志**:
|
||
```bash
|
||
docker logs mytb 2>&1 | grep -i "mqtt.*node\|component.*discovery"
|
||
```
|
||
|
||
应该看到类似:
|
||
```
|
||
Discovered rule node: org.thingsboard.rule.engine.mqtt.TbMqttNode
|
||
```
|
||
|
||
### 2. 测试 MQTT 转发
|
||
|
||
1. **配置规则链**:
|
||
- 添加消息类型过滤节点:`POST_TELEMETRY_REQUEST`
|
||
- 连接 MQTT 节点
|
||
- 配置 MQTT Broker 信息
|
||
|
||
2. **配置 MQTT 节点**:
|
||
```json
|
||
{
|
||
"topicPattern": "devices/${deviceName}/telemetry",
|
||
"host": "192.168.1.100",
|
||
"port": 1883,
|
||
"clientId": "tb-forwarder",
|
||
"cleanSession": true,
|
||
"retainedMessage": false,
|
||
"ssl": false,
|
||
"credentials": {
|
||
"type": "anonymous"
|
||
}
|
||
}
|
||
```
|
||
|
||
3. **外部服务订阅测试**:
|
||
```bash
|
||
mosquitto_sub -h 192.168.1.100 -p 1883 -t "devices/+/telemetry" -v
|
||
```
|
||
|
||
4. **发送测试数据**:
|
||
```bash
|
||
mosquitto_pub -h localhost -p 1883 \
|
||
-u YOUR_ACCESS_TOKEN \
|
||
-t "v1/devices/me/telemetry" \
|
||
-m '{"temperature":25,"humidity":60}'
|
||
```
|
||
|
||
5. **验证外部服务是否收到数据**
|
||
|
||
## 常见问题
|
||
|
||
### Q1: 节点在 UI 中看不到?
|
||
|
||
**可能原因**:
|
||
1. 组件未正确注册
|
||
2. UI 缓存问题
|
||
3. 版本不兼容
|
||
|
||
**解决方法**:
|
||
1. 清除浏览器缓存
|
||
2. 重启 ThingsBoard
|
||
3. 检查启动日志中的组件注册信息
|
||
|
||
### Q2: 连接 MQTT Broker 失败?
|
||
|
||
**检查**:
|
||
1. MQTT Broker 地址和端口是否正确
|
||
2. 网络是否可达
|
||
3. 认证信息是否正确
|
||
4. 防火墙规则
|
||
|
||
**日志**:
|
||
```bash
|
||
docker logs mytb 2>&1 | grep -i "mqtt.*connect\|mqtt.*error"
|
||
```
|
||
|
||
### Q3: 消息未转发?
|
||
|
||
**检查**:
|
||
1. 规则链是否正确连接
|
||
2. 消息类型过滤是否正确
|
||
3. 节点配置是否正确
|
||
4. 查看规则链执行日志
|
||
|
||
## 总结
|
||
|
||
1. **首先验证**:检查节点是否真的不可用
|
||
2. **如果确实不可用**:
|
||
- 方案 1:修改前端过滤逻辑(如果有)
|
||
- 方案 2:创建自定义节点(推荐)
|
||
- 方案 3:使用脚本 + HTTP 节点(临时方案)
|
||
|
||
3. **推荐方案**:使用自定义 MQTT 转发节点,功能完整且可控
|
||
|
||
## 相关文件位置
|
||
|
||
- MQTT 节点实现:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
|
||
- 节点配置类:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java`
|
||
- 组件发现服务:`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
|
||
- 配置文件:`application/src/main/resources/thingsboard.yml`
|
||
|