424 lines
13 KiB
Markdown
424 lines
13 KiB
Markdown
|
|
# ThingsBoard 社区版 MQTT 转发功能启用指南
|
|||
|
|
|
|||
|
|
## 问题分析
|
|||
|
|
|
|||
|
|
根据源码分析,`TbMqttNode` 类本身**没有许可证检查**,代码逻辑上应该可以在社区版使用。但如果遇到无法使用的情况,可能是以下原因:
|
|||
|
|
|
|||
|
|
1. **UI 层面过滤**:前端可能隐藏了某些节点
|
|||
|
|
2. **版本限制**:某些版本可能有限制
|
|||
|
|
3. **配置问题**:节点配置不正确
|
|||
|
|
|
|||
|
|
## 源码分析
|
|||
|
|
|
|||
|
|
### 1. MQTT 节点实现
|
|||
|
|
|
|||
|
|
**位置**:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
|
|||
|
|
|
|||
|
|
**关键代码**:
|
|||
|
|
```java
|
|||
|
|
@RuleNode(
|
|||
|
|
type = ComponentType.EXTERNAL,
|
|||
|
|
name = "mqtt",
|
|||
|
|
configClazz = TbMqttNodeConfiguration.class,
|
|||
|
|
version = 2,
|
|||
|
|
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
|||
|
|
nodeDescription = "Publish messages to the MQTT broker",
|
|||
|
|
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
|
|||
|
|
configDirective = "tbExternalNodeMqttConfig",
|
|||
|
|
icon = "call_split",
|
|||
|
|
docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/"
|
|||
|
|
)
|
|||
|
|
public class TbMqttNode extends TbAbstractExternalNode {
|
|||
|
|
// 没有许可证检查,直接实现功能
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**结论**:代码层面**没有许可证限制**,应该可以正常使用。
|
|||
|
|
|
|||
|
|
## 解决方案
|
|||
|
|
|
|||
|
|
### 方案 1:验证节点是否可用(推荐先尝试)
|
|||
|
|
|
|||
|
|
1. **检查节点是否已注册**
|
|||
|
|
- 登录 ThingsBoard Web UI
|
|||
|
|
- 进入规则链编辑器
|
|||
|
|
- 尝试添加 "External" → "MQTT" 节点
|
|||
|
|
- 如果能看到,说明节点可用
|
|||
|
|
|
|||
|
|
2. **如果节点不可见,检查日志**
|
|||
|
|
```bash
|
|||
|
|
# 查看 ThingsBoard 启动日志
|
|||
|
|
docker logs mytb | grep -i "mqtt"
|
|||
|
|
# 或
|
|||
|
|
tail -f /var/log/thingsboard/thingsboard.log | grep -i "rule.*node\|mqtt"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 方案 2:修改代码确保功能可用(如果确实不可用)
|
|||
|
|
|
|||
|
|
#### 步骤 1:确认节点类存在
|
|||
|
|
|
|||
|
|
检查文件是否存在:
|
|||
|
|
```bash
|
|||
|
|
ls -la thingsboard/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 步骤 2:检查组件扫描配置
|
|||
|
|
|
|||
|
|
**文件**:`application/src/main/resources/thingsboard.yml`
|
|||
|
|
|
|||
|
|
查找配置:
|
|||
|
|
```yaml
|
|||
|
|
plugins:
|
|||
|
|
scan_packages:
|
|||
|
|
- "org.thingsboard.rule.engine"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
确保 `org.thingsboard.rule.engine.mqtt` 在扫描路径内。
|
|||
|
|
|
|||
|
|
#### 步骤 3:检查组件注册
|
|||
|
|
|
|||
|
|
**文件**:`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
|
|||
|
|
|
|||
|
|
关键方法:`registerRuleNodeComponents()`(约第 121 行)
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
private void registerRuleNodeComponents() {
|
|||
|
|
for (RuleNodeClassInfo def : ruleNodeClasses.values()) {
|
|||
|
|
// ... 注册逻辑
|
|||
|
|
ComponentDescriptor component = scanAndPersistComponent(def, type);
|
|||
|
|
components.put(component.getClazz(), component);
|
|||
|
|
putComponentIntoMaps(type, def.getAnnotation(), component);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**验证**:确保 `TbMqttNode` 类能被扫描到。
|
|||
|
|
|
|||
|
|
#### 步骤 4:如果 UI 层面有过滤,修改前端代码
|
|||
|
|
|
|||
|
|
**可能的位置**:
|
|||
|
|
- `ui-ngx/src/app/modules/home/pages/rulechain/rulechain-page.component.ts`
|
|||
|
|
- `ui-ngx/src/app/modules/home/components/rule-node/`
|
|||
|
|
|
|||
|
|
**查找过滤逻辑**:
|
|||
|
|
```typescript
|
|||
|
|
// 查找是否有类似这样的过滤
|
|||
|
|
if (nodeType === 'mqtt' && !isProfessionalEdition) {
|
|||
|
|
return false; // 过滤掉
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
**修改方法**:移除或注释掉相关过滤逻辑。
|
|||
|
|
|
|||
|
|
### 方案 3:创建自定义 MQTT 转发节点(如果原节点不可用)
|
|||
|
|
|
|||
|
|
如果确实无法使用官方节点,可以创建一个自定义节点:
|
|||
|
|
|
|||
|
|
#### 步骤 1:创建自定义 MQTT 节点类
|
|||
|
|
|
|||
|
|
**新建文件**:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/CustomMqttForwardNode.java`
|
|||
|
|
|
|||
|
|
```java
|
|||
|
|
package org.thingsboard.rule.engine.mqtt;
|
|||
|
|
|
|||
|
|
import io.netty.buffer.Unpooled;
|
|||
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|||
|
|
import lombok.extern.slf4j.Slf4j;
|
|||
|
|
import org.thingsboard.mqtt.MqttClient;
|
|||
|
|
import org.thingsboard.mqtt.MqttClientConfig;
|
|||
|
|
import org.thingsboard.rule.engine.api.RuleNode;
|
|||
|
|
import org.thingsboard.rule.engine.api.TbContext;
|
|||
|
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
|||
|
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
|||
|
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
|||
|
|
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
|
|||
|
|
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
|
|||
|
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
|||
|
|
import org.thingsboard.server.common.msg.TbMsg;
|
|||
|
|
|
|||
|
|
import java.nio.charset.StandardCharsets;
|
|||
|
|
import java.util.concurrent.TimeUnit;
|
|||
|
|
import java.util.concurrent.TimeoutException;
|
|||
|
|
|
|||
|
|
@Slf4j
|
|||
|
|
@RuleNode(
|
|||
|
|
type = ComponentType.EXTERNAL,
|
|||
|
|
name = "custom_mqtt_forward",
|
|||
|
|
configClazz = TbMqttNodeConfiguration.class,
|
|||
|
|
version = 1,
|
|||
|
|
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
|
|||
|
|
nodeDescription = "Custom MQTT Forward Node - Publish messages to external MQTT broker",
|
|||
|
|
nodeDetails = "Will publish message payload to the external MQTT broker with QoS AT_LEAST_ONCE.",
|
|||
|
|
configDirective = "tbExternalNodeMqttConfig",
|
|||
|
|
icon = "call_split"
|
|||
|
|
)
|
|||
|
|
public class CustomMqttForwardNode extends TbAbstractExternalNode {
|
|||
|
|
|
|||
|
|
private TbMqttNodeConfiguration config;
|
|||
|
|
private MqttClient mqttClient;
|
|||
|
|
|
|||
|
|
@Override
|
|||
|
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
|||
|
|
super.init(ctx);
|
|||
|
|
this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
|
|||
|
|
try {
|
|||
|
|
this.mqttClient = initMqttClient(ctx);
|
|||
|
|
log.info("[{}] Custom MQTT Forward Node initialized, connecting to {}:{}",
|
|||
|
|
ctx.getSelfId(), config.getHost(), config.getPort());
|
|||
|
|
} catch (Exception e) {
|
|||
|
|
log.error("[{}] Failed to initialize MQTT client", ctx.getSelfId(), e);
|
|||
|
|
throw new TbNodeException("Failed to initialize MQTT client: " + e.getMessage(), e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Override
|
|||
|
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
|||
|
|
try {
|
|||
|
|
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
|
|||
|
|
String payload = msg.getData();
|
|||
|
|
|
|||
|
|
log.debug("[{}] Publishing message to topic: {}, payload: {}",
|
|||
|
|
ctx.getSelfId(), topic, payload);
|
|||
|
|
|
|||
|
|
var tbMsg = ackIfNeeded(ctx, msg);
|
|||
|
|
|
|||
|
|
this.mqttClient.publish(
|
|||
|
|
topic,
|
|||
|
|
Unpooled.wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)),
|
|||
|
|
MqttQoS.AT_LEAST_ONCE,
|
|||
|
|
config.isRetainedMessage()
|
|||
|
|
).addListener(future -> {
|
|||
|
|
if (future.isSuccess()) {
|
|||
|
|
log.debug("[{}] Successfully published to MQTT broker", ctx.getSelfId());
|
|||
|
|
tellSuccess(ctx, tbMsg);
|
|||
|
|
} else {
|
|||
|
|
log.error("[{}] Failed to publish to MQTT broker", ctx.getSelfId(), future.cause());
|
|||
|
|
tellFailure(ctx, tbMsg, future.cause());
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
} catch (Exception e) {
|
|||
|
|
log.error("[{}] Error processing message", ctx.getSelfId(), e);
|
|||
|
|
ctx.tellFailure(msg, e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@Override
|
|||
|
|
public void destroy() {
|
|||
|
|
if (mqttClient != null) {
|
|||
|
|
try {
|
|||
|
|
mqttClient.disconnect();
|
|||
|
|
log.info("MQTT client disconnected");
|
|||
|
|
} catch (Exception e) {
|
|||
|
|
log.error("Error disconnecting MQTT client", e);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
private MqttClient initMqttClient(TbContext ctx) throws Exception {
|
|||
|
|
MqttClientConfig mqttConfig = new MqttClientConfig();
|
|||
|
|
|
|||
|
|
// 设置客户端 ID
|
|||
|
|
String clientId = config.getClientId();
|
|||
|
|
if (clientId == null || clientId.isEmpty()) {
|
|||
|
|
clientId = "tb-forwarder-" + ctx.getSelfId().getId();
|
|||
|
|
}
|
|||
|
|
if (config.isAppendClientIdSuffix()) {
|
|||
|
|
clientId = clientId + "_" + ctx.getServiceId();
|
|||
|
|
}
|
|||
|
|
mqttConfig.setClientId(clientId);
|
|||
|
|
|
|||
|
|
// 设置其他配置
|
|||
|
|
mqttConfig.setCleanSession(config.isCleanSession());
|
|||
|
|
mqttConfig.setProtocolVersion(config.getProtocolVersion());
|
|||
|
|
|
|||
|
|
// 设置认证(如果需要)
|
|||
|
|
if (config.getCredentials() != null) {
|
|||
|
|
var credentials = config.getCredentials();
|
|||
|
|
if (credentials.getType() == org.thingsboard.rule.engine.credentials.CredentialsType.BASIC) {
|
|||
|
|
var basicCreds = (org.thingsboard.rule.engine.credentials.BasicCredentials) credentials;
|
|||
|
|
mqttConfig.setUsername(basicCreds.getUsername());
|
|||
|
|
mqttConfig.setPassword(basicCreds.getPassword());
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 设置 SSL(如果需要)
|
|||
|
|
if (config.isSsl()) {
|
|||
|
|
mqttConfig.setSslContext(config.getCredentials().initSslContext());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 创建客户端
|
|||
|
|
MqttClient client = MqttClient.create(mqttConfig, null, ctx.getExternalCallExecutor());
|
|||
|
|
client.setEventLoop(ctx.getSharedEventLoop());
|
|||
|
|
|
|||
|
|
// 连接
|
|||
|
|
var connectFuture = client.connect(config.getHost(), config.getPort());
|
|||
|
|
var result = connectFuture.get(config.getConnectTimeoutSec(), TimeUnit.SECONDS);
|
|||
|
|
|
|||
|
|
if (!result.isSuccess()) {
|
|||
|
|
throw new RuntimeException("Failed to connect to MQTT broker: " + result.getReturnCode());
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return client;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 步骤 2:重新编译项目
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
cd /Users/sunpeng/workspace/things/thingsboard
|
|||
|
|
|
|||
|
|
# 编译规则引擎模块
|
|||
|
|
mvn clean install -pl rule-engine/rule-engine-components -am -DskipTests
|
|||
|
|
|
|||
|
|
# 或者编译整个项目
|
|||
|
|
mvn clean install -DskipTests
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 步骤 3:重新打包和部署
|
|||
|
|
|
|||
|
|
```bash
|
|||
|
|
# 重新构建 Docker 镜像
|
|||
|
|
cd msa/tb
|
|||
|
|
mvn clean install -DskipTests -Ddockerfile.skip=false
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 方案 4:使用规则引擎脚本节点实现转发(无需修改代码)
|
|||
|
|
|
|||
|
|
如果不想修改代码,可以使用 JavaScript 脚本节点 + HTTP 节点实现:
|
|||
|
|
|
|||
|
|
#### 步骤 1:在规则链中添加脚本节点
|
|||
|
|
|
|||
|
|
**脚本内容**:
|
|||
|
|
```javascript
|
|||
|
|
// 将遥测数据转换为 HTTP 请求格式
|
|||
|
|
var data = JSON.parse(msg.data);
|
|||
|
|
var metadata = msg.metadata;
|
|||
|
|
|
|||
|
|
// 构建要发送的数据
|
|||
|
|
var payload = {
|
|||
|
|
deviceName: metadata.deviceName,
|
|||
|
|
deviceType: metadata.deviceType,
|
|||
|
|
timestamp: metadata.ts,
|
|||
|
|
telemetry: data
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// 更新消息数据
|
|||
|
|
return {
|
|||
|
|
msg: JSON.stringify(payload),
|
|||
|
|
metadata: metadata,
|
|||
|
|
msgType: msgType
|
|||
|
|
};
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
#### 步骤 2:添加 REST API Call 节点
|
|||
|
|
|
|||
|
|
配置你的外部服务 HTTP 接口,将数据转发出去。
|
|||
|
|
|
|||
|
|
**注意**:这种方式不如 MQTT 实时,但无需修改代码。
|
|||
|
|
|
|||
|
|
## 验证方法
|
|||
|
|
|
|||
|
|
### 1. 检查节点是否注册成功
|
|||
|
|
|
|||
|
|
**查看日志**:
|
|||
|
|
```bash
|
|||
|
|
docker logs mytb 2>&1 | grep -i "mqtt.*node\|component.*discovery"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
应该看到类似:
|
|||
|
|
```
|
|||
|
|
Discovered rule node: org.thingsboard.rule.engine.mqtt.TbMqttNode
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 2. 测试 MQTT 转发
|
|||
|
|
|
|||
|
|
1. **配置规则链**:
|
|||
|
|
- 添加消息类型过滤节点:`POST_TELEMETRY_REQUEST`
|
|||
|
|
- 连接 MQTT 节点
|
|||
|
|
- 配置 MQTT Broker 信息
|
|||
|
|
|
|||
|
|
2. **配置 MQTT 节点**:
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"topicPattern": "devices/${deviceName}/telemetry",
|
|||
|
|
"host": "192.168.1.100",
|
|||
|
|
"port": 1883,
|
|||
|
|
"clientId": "tb-forwarder",
|
|||
|
|
"cleanSession": true,
|
|||
|
|
"retainedMessage": false,
|
|||
|
|
"ssl": false,
|
|||
|
|
"credentials": {
|
|||
|
|
"type": "anonymous"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
3. **外部服务订阅测试**:
|
|||
|
|
```bash
|
|||
|
|
mosquitto_sub -h 192.168.1.100 -p 1883 -t "devices/+/telemetry" -v
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
4. **发送测试数据**:
|
|||
|
|
```bash
|
|||
|
|
mosquitto_pub -h localhost -p 1883 \
|
|||
|
|
-u YOUR_ACCESS_TOKEN \
|
|||
|
|
-t "v1/devices/me/telemetry" \
|
|||
|
|
-m '{"temperature":25,"humidity":60}'
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
5. **验证外部服务是否收到数据**
|
|||
|
|
|
|||
|
|
## 常见问题
|
|||
|
|
|
|||
|
|
### Q1: 节点在 UI 中看不到?
|
|||
|
|
|
|||
|
|
**可能原因**:
|
|||
|
|
1. 组件未正确注册
|
|||
|
|
2. UI 缓存问题
|
|||
|
|
3. 版本不兼容
|
|||
|
|
|
|||
|
|
**解决方法**:
|
|||
|
|
1. 清除浏览器缓存
|
|||
|
|
2. 重启 ThingsBoard
|
|||
|
|
3. 检查启动日志中的组件注册信息
|
|||
|
|
|
|||
|
|
### Q2: 连接 MQTT Broker 失败?
|
|||
|
|
|
|||
|
|
**检查**:
|
|||
|
|
1. MQTT Broker 地址和端口是否正确
|
|||
|
|
2. 网络是否可达
|
|||
|
|
3. 认证信息是否正确
|
|||
|
|
4. 防火墙规则
|
|||
|
|
|
|||
|
|
**日志**:
|
|||
|
|
```bash
|
|||
|
|
docker logs mytb 2>&1 | grep -i "mqtt.*connect\|mqtt.*error"
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### Q3: 消息未转发?
|
|||
|
|
|
|||
|
|
**检查**:
|
|||
|
|
1. 规则链是否正确连接
|
|||
|
|
2. 消息类型过滤是否正确
|
|||
|
|
3. 节点配置是否正确
|
|||
|
|
4. 查看规则链执行日志
|
|||
|
|
|
|||
|
|
## 总结
|
|||
|
|
|
|||
|
|
1. **首先验证**:检查节点是否真的不可用
|
|||
|
|
2. **如果确实不可用**:
|
|||
|
|
- 方案 1:修改前端过滤逻辑(如果有)
|
|||
|
|
- 方案 2:创建自定义节点(推荐)
|
|||
|
|
- 方案 3:使用脚本 + HTTP 节点(临时方案)
|
|||
|
|
|
|||
|
|
3. **推荐方案**:使用自定义 MQTT 转发节点,功能完整且可控
|
|||
|
|
|
|||
|
|
## 相关文件位置
|
|||
|
|
|
|||
|
|
- MQTT 节点实现:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
|
|||
|
|
- 节点配置类:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java`
|
|||
|
|
- 组件发现服务:`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
|
|||
|
|
- 配置文件:`application/src/main/resources/thingsboard.yml`
|
|||
|
|
|