thingsboard/summary/22-社区版MQTT转发功能启用指南.md

424 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ThingsBoard 社区版 MQTT 转发功能启用指南
## 问题分析
根据源码分析,`TbMqttNode` 类本身**没有许可证检查**,代码逻辑上应该可以在社区版使用。但如果遇到无法使用的情况,可能是以下原因:
1. **UI 层面过滤**:前端可能隐藏了某些节点
2. **版本限制**:某些版本可能有限制
3. **配置问题**:节点配置不正确
## 源码分析
### 1. MQTT 节点实现
**位置**`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
**关键代码**
```java
@RuleNode(
type = ComponentType.EXTERNAL,
name = "mqtt",
configClazz = TbMqttNodeConfiguration.class,
version = 2,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Publish messages to the MQTT broker",
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
configDirective = "tbExternalNodeMqttConfig",
icon = "call_split",
docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/"
)
public class TbMqttNode extends TbAbstractExternalNode {
// 没有许可证检查,直接实现功能
}
```
**结论**:代码层面**没有许可证限制**,应该可以正常使用。
## 解决方案
### 方案 1验证节点是否可用推荐先尝试
1. **检查节点是否已注册**
- 登录 ThingsBoard Web UI
- 进入规则链编辑器
- 尝试添加 "External" → "MQTT" 节点
- 如果能看到,说明节点可用
2. **如果节点不可见,检查日志**
```bash
# 查看 ThingsBoard 启动日志
docker logs mytb | grep -i "mqtt"
# 或
tail -f /var/log/thingsboard/thingsboard.log | grep -i "rule.*node\|mqtt"
```
### 方案 2修改代码确保功能可用如果确实不可用
#### 步骤 1确认节点类存在
检查文件是否存在:
```bash
ls -la thingsboard/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
```
#### 步骤 2检查组件扫描配置
**文件**`application/src/main/resources/thingsboard.yml`
查找配置:
```yaml
plugins:
scan_packages:
- "org.thingsboard.rule.engine"
```
确保 `org.thingsboard.rule.engine.mqtt` 在扫描路径内。
#### 步骤 3检查组件注册
**文件**`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
关键方法:`registerRuleNodeComponents()`(约第 121 行)
```java
private void registerRuleNodeComponents() {
for (RuleNodeClassInfo def : ruleNodeClasses.values()) {
// ... 注册逻辑
ComponentDescriptor component = scanAndPersistComponent(def, type);
components.put(component.getClazz(), component);
putComponentIntoMaps(type, def.getAnnotation(), component);
}
}
```
**验证**:确保 `TbMqttNode` 类能被扫描到。
#### 步骤 4如果 UI 层面有过滤,修改前端代码
**可能的位置**
- `ui-ngx/src/app/modules/home/pages/rulechain/rulechain-page.component.ts`
- `ui-ngx/src/app/modules/home/components/rule-node/`
**查找过滤逻辑**
```typescript
// 查找是否有类似这样的过滤
if (nodeType === 'mqtt' && !isProfessionalEdition) {
return false; // 过滤掉
}
```
**修改方法**:移除或注释掉相关过滤逻辑。
### 方案 3创建自定义 MQTT 转发节点(如果原节点不可用)
如果确实无法使用官方节点,可以创建一个自定义节点:
#### 步骤 1创建自定义 MQTT 节点类
**新建文件**`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/CustomMqttForwardNode.java`
```java
package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
@RuleNode(
type = ComponentType.EXTERNAL,
name = "custom_mqtt_forward",
configClazz = TbMqttNodeConfiguration.class,
version = 1,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Custom MQTT Forward Node - Publish messages to external MQTT broker",
nodeDetails = "Will publish message payload to the external MQTT broker with QoS AT_LEAST_ONCE.",
configDirective = "tbExternalNodeMqttConfig",
icon = "call_split"
)
public class CustomMqttForwardNode extends TbAbstractExternalNode {
private TbMqttNodeConfiguration config;
private MqttClient mqttClient;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
try {
this.mqttClient = initMqttClient(ctx);
log.info("[{}] Custom MQTT Forward Node initialized, connecting to {}:{}",
ctx.getSelfId(), config.getHost(), config.getPort());
} catch (Exception e) {
log.error("[{}] Failed to initialize MQTT client", ctx.getSelfId(), e);
throw new TbNodeException("Failed to initialize MQTT client: " + e.getMessage(), e);
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
String payload = msg.getData();
log.debug("[{}] Publishing message to topic: {}, payload: {}",
ctx.getSelfId(), topic, payload);
var tbMsg = ackIfNeeded(ctx, msg);
this.mqttClient.publish(
topic,
Unpooled.wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)),
MqttQoS.AT_LEAST_ONCE,
config.isRetainedMessage()
).addListener(future -> {
if (future.isSuccess()) {
log.debug("[{}] Successfully published to MQTT broker", ctx.getSelfId());
tellSuccess(ctx, tbMsg);
} else {
log.error("[{}] Failed to publish to MQTT broker", ctx.getSelfId(), future.cause());
tellFailure(ctx, tbMsg, future.cause());
}
});
} catch (Exception e) {
log.error("[{}] Error processing message", ctx.getSelfId(), e);
ctx.tellFailure(msg, e);
}
}
@Override
public void destroy() {
if (mqttClient != null) {
try {
mqttClient.disconnect();
log.info("MQTT client disconnected");
} catch (Exception e) {
log.error("Error disconnecting MQTT client", e);
}
}
}
private MqttClient initMqttClient(TbContext ctx) throws Exception {
MqttClientConfig mqttConfig = new MqttClientConfig();
// 设置客户端 ID
String clientId = config.getClientId();
if (clientId == null || clientId.isEmpty()) {
clientId = "tb-forwarder-" + ctx.getSelfId().getId();
}
if (config.isAppendClientIdSuffix()) {
clientId = clientId + "_" + ctx.getServiceId();
}
mqttConfig.setClientId(clientId);
// 设置其他配置
mqttConfig.setCleanSession(config.isCleanSession());
mqttConfig.setProtocolVersion(config.getProtocolVersion());
// 设置认证(如果需要)
if (config.getCredentials() != null) {
var credentials = config.getCredentials();
if (credentials.getType() == org.thingsboard.rule.engine.credentials.CredentialsType.BASIC) {
var basicCreds = (org.thingsboard.rule.engine.credentials.BasicCredentials) credentials;
mqttConfig.setUsername(basicCreds.getUsername());
mqttConfig.setPassword(basicCreds.getPassword());
}
}
// 设置 SSL如果需要
if (config.isSsl()) {
mqttConfig.setSslContext(config.getCredentials().initSslContext());
}
// 创建客户端
MqttClient client = MqttClient.create(mqttConfig, null, ctx.getExternalCallExecutor());
client.setEventLoop(ctx.getSharedEventLoop());
// 连接
var connectFuture = client.connect(config.getHost(), config.getPort());
var result = connectFuture.get(config.getConnectTimeoutSec(), TimeUnit.SECONDS);
if (!result.isSuccess()) {
throw new RuntimeException("Failed to connect to MQTT broker: " + result.getReturnCode());
}
return client;
}
}
```
#### 步骤 2重新编译项目
```bash
cd /Users/sunpeng/workspace/things/thingsboard
# 编译规则引擎模块
mvn clean install -pl rule-engine/rule-engine-components -am -DskipTests
# 或者编译整个项目
mvn clean install -DskipTests
```
#### 步骤 3重新打包和部署
```bash
# 重新构建 Docker 镜像
cd msa/tb
mvn clean install -DskipTests -Ddockerfile.skip=false
```
### 方案 4使用规则引擎脚本节点实现转发无需修改代码
如果不想修改代码,可以使用 JavaScript 脚本节点 + HTTP 节点实现:
#### 步骤 1在规则链中添加脚本节点
**脚本内容**
```javascript
// 将遥测数据转换为 HTTP 请求格式
var data = JSON.parse(msg.data);
var metadata = msg.metadata;
// 构建要发送的数据
var payload = {
deviceName: metadata.deviceName,
deviceType: metadata.deviceType,
timestamp: metadata.ts,
telemetry: data
};
// 更新消息数据
return {
msg: JSON.stringify(payload),
metadata: metadata,
msgType: msgType
};
```
#### 步骤 2添加 REST API Call 节点
配置你的外部服务 HTTP 接口,将数据转发出去。
**注意**:这种方式不如 MQTT 实时,但无需修改代码。
## 验证方法
### 1. 检查节点是否注册成功
**查看日志**
```bash
docker logs mytb 2>&1 | grep -i "mqtt.*node\|component.*discovery"
```
应该看到类似:
```
Discovered rule node: org.thingsboard.rule.engine.mqtt.TbMqttNode
```
### 2. 测试 MQTT 转发
1. **配置规则链**
- 添加消息类型过滤节点:`POST_TELEMETRY_REQUEST`
- 连接 MQTT 节点
- 配置 MQTT Broker 信息
2. **配置 MQTT 节点**
```json
{
"topicPattern": "devices/${deviceName}/telemetry",
"host": "192.168.1.100",
"port": 1883,
"clientId": "tb-forwarder",
"cleanSession": true,
"retainedMessage": false,
"ssl": false,
"credentials": {
"type": "anonymous"
}
}
```
3. **外部服务订阅测试**
```bash
mosquitto_sub -h 192.168.1.100 -p 1883 -t "devices/+/telemetry" -v
```
4. **发送测试数据**
```bash
mosquitto_pub -h localhost -p 1883 \
-u YOUR_ACCESS_TOKEN \
-t "v1/devices/me/telemetry" \
-m '{"temperature":25,"humidity":60}'
```
5. **验证外部服务是否收到数据**
## 常见问题
### Q1: 节点在 UI 中看不到?
**可能原因**
1. 组件未正确注册
2. UI 缓存问题
3. 版本不兼容
**解决方法**
1. 清除浏览器缓存
2. 重启 ThingsBoard
3. 检查启动日志中的组件注册信息
### Q2: 连接 MQTT Broker 失败?
**检查**
1. MQTT Broker 地址和端口是否正确
2. 网络是否可达
3. 认证信息是否正确
4. 防火墙规则
**日志**
```bash
docker logs mytb 2>&1 | grep -i "mqtt.*connect\|mqtt.*error"
```
### Q3: 消息未转发?
**检查**
1. 规则链是否正确连接
2. 消息类型过滤是否正确
3. 节点配置是否正确
4. 查看规则链执行日志
## 总结
1. **首先验证**:检查节点是否真的不可用
2. **如果确实不可用**
- 方案 1修改前端过滤逻辑如果有
- 方案 2创建自定义节点推荐
- 方案 3使用脚本 + HTTP 节点(临时方案)
3. **推荐方案**:使用自定义 MQTT 转发节点,功能完整且可控
## 相关文件位置
- MQTT 节点实现:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java`
- 节点配置类:`rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java`
- 组件发现服务:`application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java`
- 配置文件:`application/src/main/resources/thingsboard.yml`