13 KiB
ThingsBoard 社区版 MQTT 转发功能启用指南
问题分析
根据源码分析,TbMqttNode 类本身没有许可证检查,代码逻辑上应该可以在社区版使用。但如果遇到无法使用的情况,可能是以下原因:
- UI 层面过滤:前端可能隐藏了某些节点
- 版本限制:某些版本可能有限制
- 配置问题:节点配置不正确
源码分析
1. MQTT 节点实现
位置:rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
关键代码:
@RuleNode(
type = ComponentType.EXTERNAL,
name = "mqtt",
configClazz = TbMqttNodeConfiguration.class,
version = 2,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Publish messages to the MQTT broker",
nodeDetails = "Will publish message payload to the MQTT broker with QoS <b>AT_LEAST_ONCE</b>.",
configDirective = "tbExternalNodeMqttConfig",
icon = "call_split",
docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/"
)
public class TbMqttNode extends TbAbstractExternalNode {
// 没有许可证检查,直接实现功能
}
结论:代码层面没有许可证限制,应该可以正常使用。
解决方案
方案 1:验证节点是否可用(推荐先尝试)
-
检查节点是否已注册
- 登录 ThingsBoard Web UI
- 进入规则链编辑器
- 尝试添加 "External" → "MQTT" 节点
- 如果能看到,说明节点可用
-
如果节点不可见,检查日志
# 查看 ThingsBoard 启动日志 docker logs mytb | grep -i "mqtt" # 或 tail -f /var/log/thingsboard/thingsboard.log | grep -i "rule.*node\|mqtt"
方案 2:修改代码确保功能可用(如果确实不可用)
步骤 1:确认节点类存在
检查文件是否存在:
ls -la thingsboard/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java
步骤 2:检查组件扫描配置
文件:application/src/main/resources/thingsboard.yml
查找配置:
plugins:
scan_packages:
- "org.thingsboard.rule.engine"
确保 org.thingsboard.rule.engine.mqtt 在扫描路径内。
步骤 3:检查组件注册
文件:application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
关键方法:registerRuleNodeComponents()(约第 121 行)
private void registerRuleNodeComponents() {
for (RuleNodeClassInfo def : ruleNodeClasses.values()) {
// ... 注册逻辑
ComponentDescriptor component = scanAndPersistComponent(def, type);
components.put(component.getClazz(), component);
putComponentIntoMaps(type, def.getAnnotation(), component);
}
}
验证:确保 TbMqttNode 类能被扫描到。
步骤 4:如果 UI 层面有过滤,修改前端代码
可能的位置:
ui-ngx/src/app/modules/home/pages/rulechain/rulechain-page.component.tsui-ngx/src/app/modules/home/components/rule-node/
查找过滤逻辑:
// 查找是否有类似这样的过滤
if (nodeType === 'mqtt' && !isProfessionalEdition) {
return false; // 过滤掉
}
修改方法:移除或注释掉相关过滤逻辑。
方案 3:创建自定义 MQTT 转发节点(如果原节点不可用)
如果确实无法使用官方节点,可以创建一个自定义节点:
步骤 1:创建自定义 MQTT 节点类
新建文件:rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/CustomMqttForwardNode.java
package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentClusteringMode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
@RuleNode(
type = ComponentType.EXTERNAL,
name = "custom_mqtt_forward",
configClazz = TbMqttNodeConfiguration.class,
version = 1,
clusteringMode = ComponentClusteringMode.USER_PREFERENCE,
nodeDescription = "Custom MQTT Forward Node - Publish messages to external MQTT broker",
nodeDetails = "Will publish message payload to the external MQTT broker with QoS AT_LEAST_ONCE.",
configDirective = "tbExternalNodeMqttConfig",
icon = "call_split"
)
public class CustomMqttForwardNode extends TbAbstractExternalNode {
private TbMqttNodeConfiguration config;
private MqttClient mqttClient;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx);
this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
try {
this.mqttClient = initMqttClient(ctx);
log.info("[{}] Custom MQTT Forward Node initialized, connecting to {}:{}",
ctx.getSelfId(), config.getHost(), config.getPort());
} catch (Exception e) {
log.error("[{}] Failed to initialize MQTT client", ctx.getSelfId(), e);
throw new TbNodeException("Failed to initialize MQTT client: " + e.getMessage(), e);
}
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
try {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
String payload = msg.getData();
log.debug("[{}] Publishing message to topic: {}, payload: {}",
ctx.getSelfId(), topic, payload);
var tbMsg = ackIfNeeded(ctx, msg);
this.mqttClient.publish(
topic,
Unpooled.wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)),
MqttQoS.AT_LEAST_ONCE,
config.isRetainedMessage()
).addListener(future -> {
if (future.isSuccess()) {
log.debug("[{}] Successfully published to MQTT broker", ctx.getSelfId());
tellSuccess(ctx, tbMsg);
} else {
log.error("[{}] Failed to publish to MQTT broker", ctx.getSelfId(), future.cause());
tellFailure(ctx, tbMsg, future.cause());
}
});
} catch (Exception e) {
log.error("[{}] Error processing message", ctx.getSelfId(), e);
ctx.tellFailure(msg, e);
}
}
@Override
public void destroy() {
if (mqttClient != null) {
try {
mqttClient.disconnect();
log.info("MQTT client disconnected");
} catch (Exception e) {
log.error("Error disconnecting MQTT client", e);
}
}
}
private MqttClient initMqttClient(TbContext ctx) throws Exception {
MqttClientConfig mqttConfig = new MqttClientConfig();
// 设置客户端 ID
String clientId = config.getClientId();
if (clientId == null || clientId.isEmpty()) {
clientId = "tb-forwarder-" + ctx.getSelfId().getId();
}
if (config.isAppendClientIdSuffix()) {
clientId = clientId + "_" + ctx.getServiceId();
}
mqttConfig.setClientId(clientId);
// 设置其他配置
mqttConfig.setCleanSession(config.isCleanSession());
mqttConfig.setProtocolVersion(config.getProtocolVersion());
// 设置认证(如果需要)
if (config.getCredentials() != null) {
var credentials = config.getCredentials();
if (credentials.getType() == org.thingsboard.rule.engine.credentials.CredentialsType.BASIC) {
var basicCreds = (org.thingsboard.rule.engine.credentials.BasicCredentials) credentials;
mqttConfig.setUsername(basicCreds.getUsername());
mqttConfig.setPassword(basicCreds.getPassword());
}
}
// 设置 SSL(如果需要)
if (config.isSsl()) {
mqttConfig.setSslContext(config.getCredentials().initSslContext());
}
// 创建客户端
MqttClient client = MqttClient.create(mqttConfig, null, ctx.getExternalCallExecutor());
client.setEventLoop(ctx.getSharedEventLoop());
// 连接
var connectFuture = client.connect(config.getHost(), config.getPort());
var result = connectFuture.get(config.getConnectTimeoutSec(), TimeUnit.SECONDS);
if (!result.isSuccess()) {
throw new RuntimeException("Failed to connect to MQTT broker: " + result.getReturnCode());
}
return client;
}
}
步骤 2:重新编译项目
cd /Users/sunpeng/workspace/things/thingsboard
# 编译规则引擎模块
mvn clean install -pl rule-engine/rule-engine-components -am -DskipTests
# 或者编译整个项目
mvn clean install -DskipTests
步骤 3:重新打包和部署
# 重新构建 Docker 镜像
cd msa/tb
mvn clean install -DskipTests -Ddockerfile.skip=false
方案 4:使用规则引擎脚本节点实现转发(无需修改代码)
如果不想修改代码,可以使用 JavaScript 脚本节点 + HTTP 节点实现:
步骤 1:在规则链中添加脚本节点
脚本内容:
// 将遥测数据转换为 HTTP 请求格式
var data = JSON.parse(msg.data);
var metadata = msg.metadata;
// 构建要发送的数据
var payload = {
deviceName: metadata.deviceName,
deviceType: metadata.deviceType,
timestamp: metadata.ts,
telemetry: data
};
// 更新消息数据
return {
msg: JSON.stringify(payload),
metadata: metadata,
msgType: msgType
};
步骤 2:添加 REST API Call 节点
配置你的外部服务 HTTP 接口,将数据转发出去。
注意:这种方式不如 MQTT 实时,但无需修改代码。
验证方法
1. 检查节点是否注册成功
查看日志:
docker logs mytb 2>&1 | grep -i "mqtt.*node\|component.*discovery"
应该看到类似:
Discovered rule node: org.thingsboard.rule.engine.mqtt.TbMqttNode
2. 测试 MQTT 转发
-
配置规则链:
- 添加消息类型过滤节点:
POST_TELEMETRY_REQUEST - 连接 MQTT 节点
- 配置 MQTT Broker 信息
- 添加消息类型过滤节点:
-
配置 MQTT 节点:
{ "topicPattern": "devices/${deviceName}/telemetry", "host": "192.168.1.100", "port": 1883, "clientId": "tb-forwarder", "cleanSession": true, "retainedMessage": false, "ssl": false, "credentials": { "type": "anonymous" } } -
外部服务订阅测试:
mosquitto_sub -h 192.168.1.100 -p 1883 -t "devices/+/telemetry" -v -
发送测试数据:
mosquitto_pub -h localhost -p 1883 \ -u YOUR_ACCESS_TOKEN \ -t "v1/devices/me/telemetry" \ -m '{"temperature":25,"humidity":60}' -
验证外部服务是否收到数据
常见问题
Q1: 节点在 UI 中看不到?
可能原因:
- 组件未正确注册
- UI 缓存问题
- 版本不兼容
解决方法:
- 清除浏览器缓存
- 重启 ThingsBoard
- 检查启动日志中的组件注册信息
Q2: 连接 MQTT Broker 失败?
检查:
- MQTT Broker 地址和端口是否正确
- 网络是否可达
- 认证信息是否正确
- 防火墙规则
日志:
docker logs mytb 2>&1 | grep -i "mqtt.*connect\|mqtt.*error"
Q3: 消息未转发?
检查:
- 规则链是否正确连接
- 消息类型过滤是否正确
- 节点配置是否正确
- 查看规则链执行日志
总结
-
首先验证:检查节点是否真的不可用
-
如果确实不可用:
- 方案 1:修改前端过滤逻辑(如果有)
- 方案 2:创建自定义节点(推荐)
- 方案 3:使用脚本 + HTTP 节点(临时方案)
-
推荐方案:使用自定义 MQTT 转发节点,功能完整且可控
相关文件位置
- MQTT 节点实现:
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java - 节点配置类:
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java - 组件发现服务:
application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java - 配置文件:
application/src/main/resources/thingsboard.yml