29 KiB
ThingsBoard 设备状态和遥测数据查询订阅指南
目录
查询设备遥测数据
1. 获取最新的遥测值(推荐)
REST API 端点:
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries
参数说明:
entityType: 实体类型,通常是DEVICEentityId: 设备 ID(UUID)keys(可选): 遥测键名,多个用逗号分隔,如temperature,humidity。不指定则返回所有键useStrictDataTypes(可选): 是否使用严格数据类型,默认false(值转为字符串)
示例:
# 获取所有最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取指定键的最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature,humidity" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 使用严格数据类型(返回原始类型)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&useStrictDataTypes=true" \
-H "X-Authorization: Bearer $JWT_TOKEN"
响应格式(默认,字符串类型):
{
"temperature": [
{
"ts": 1705732800000,
"value": "25.5"
}
],
"humidity": [
{
"ts": 1705732800000,
"value": "60.2"
}
]
}
响应格式(useStrictDataTypes=true,原始类型):
{
"temperature": [
{
"ts": 1705732800000,
"value": 25.5
}
],
"humidity": [
{
"ts": 1705732800000,
"value": 60.2
}
]
}
2. 获取历史遥测数据
REST API 端点:
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries?keys={keys}&startTs={startTs}&endTs={endTs}
参数说明:
keys: 遥测键名,多个用逗号分隔startTs: 开始时间戳(毫秒,UTC)endTs: 结束时间戳(毫秒,UTC)interval(可选): 聚合间隔(毫秒),默认 0(不聚合)intervalType(可选): 聚合类型,如MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS,WEEKS,MONTHSagg(可选): 聚合函数,如MIN,MAX,AVG,SUM,COUNTlimit(可选): 最大返回数据点数,默认 100orderBy(可选): 排序方式,ASC或DESC,默认DESC
示例:
# 获取最近1小时的数据
START_TS=$(($(date +%s) * 1000 - 3600000))
END_TS=$(($(date +%s) * 1000))
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&limit=100" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取聚合数据(每小时平均值)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&interval=3600000&agg=AVG" \
-H "X-Authorization: Bearer $JWT_TOKEN"
响应格式:
[
{
"ts": 1705732800000,
"value": "25.5"
},
{
"ts": 1705736400000,
"value": "26.0"
}
]
3. 获取所有遥测键名
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/keys/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN"
响应格式:
["temperature", "humidity", "pressure", "batteryLevel"]
查询设备属性
1. 获取设备属性值
REST API 端点:
GET /api/plugins/telemetry/{entityType}/{entityId}/values/attributes
参数说明:
keys(可选): 属性键名,多个用逗号分隔
示例:
# 获取所有属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取指定属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes?keys=model,firmware" \
-H "X-Authorization: Bearer $JWT_TOKEN"
2. 按作用域获取属性
属性分为三种作用域:
SERVER_SCOPE: 服务器端属性(ThingsBoard 管理)SHARED_SCOPE: 共享属性(服务器和客户端共享)CLIENT_SCOPE: 客户端属性(设备端管理)
# 获取服务器端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取客户端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/CLIENT_SCOPE" \
-H "X-Authorization: Bearer $JWT_TOKEN"
查询设备状态
1. 设备在线状态
设备状态存储在 SERVER_SCOPE 属性中,键名为 active。
# 查询设备是否在线
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN"
响应格式:
[
{
"key": "active",
"value": true,
"lastUpdateTs": 1705732800000
}
]
2. 设备连接信息
设备状态还包括以下信息(存储在 SERVER_SCOPE 属性或遥测中):
lastConnectTime: 最后连接时间lastDisconnectTime: 最后断开时间lastActivityTime: 最后活动时间
# 查询设备连接信息
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active,lastConnectTime,lastActivityTime" \
-H "X-Authorization: Bearer $JWT_TOKEN"
订阅实时更新
1. MQTT 订阅(设备端)
ThingsBoard 支持通过 MQTT 协议订阅设备属性更新和 RPC 请求。注意: MQTT 主要用于设备端接收服务器推送的属性更新,遥测数据通常由设备发布给服务器。
可订阅的 MQTT 主题
v1 主题格式:
| 主题 | 说明 | 用途 |
|---|---|---|
v1/devices/me/attributes |
属性更新 | 订阅服务器推送给设备的属性更新(共享属性和服务器属性) |
v1/devices/me/attributes/response/+ |
属性请求响应 | 订阅属性请求的响应(+ 是请求 ID) |
v1/devices/me/rpc/request/+ |
RPC 请求 | 订阅服务器发送的 RPC 请求(+ 是请求 ID) |
v1/devices/me/rpc/response/+ |
RPC 响应 | 订阅 RPC 响应的主题(用于客户端 RPC) |
v2 短主题格式(更高效):
| 主题 | 说明 | 对应 v1 主题 |
|---|---|---|
v2/a |
属性更新(短格式) | v1/devices/me/attributes |
v2/a/res/+ |
属性响应(短格式) | v1/devices/me/attributes/response/+ |
v2/r/req/+ |
RPC 请求(短格式) | v1/devices/me/rpc/request/+ |
v2/r/res/+ |
RPC 响应(短格式) | v1/devices/me/rpc/response/+ |
Python MQTT 订阅示例
import paho.mqtt.client as mqtt
import json
# MQTT 连接配置
THINGSBOARD_HOST = "localhost"
THINGSBOARD_PORT = 1883
ACCESS_TOKEN = "YOUR_DEVICE_ACCESS_TOKEN"
def on_connect(client, userdata, flags, rc):
"""连接成功回调"""
if rc == 0:
print("Connected to ThingsBoard")
# 订阅属性更新
client.subscribe('v1/devices/me/attributes', qos=1)
print("Subscribed to: v1/devices/me/attributes")
# 订阅属性请求响应
client.subscribe('v1/devices/me/attributes/response/+', qos=1)
print("Subscribed to: v1/devices/me/attributes/response/+")
# 订阅 RPC 请求
client.subscribe('v1/devices/me/rpc/request/+', qos=1)
print("Subscribed to: v1/devices/me/rpc/request/+")
# 请求服务器端属性(可选)
request_id = 1
request = {"keys": "active,lastConnectTime"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
print(f"Requested attributes: {request}")
else:
print(f"Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
"""消息接收回调"""
topic = msg.topic
payload = msg.payload.decode('utf-8')
print(f"\n收到消息 - 主题: {topic}")
print(f"消息内容: {payload}")
try:
data = json.loads(payload)
# 处理属性更新
if topic == 'v1/devices/me/attributes':
print("=== 属性更新 ===")
for key, value in data.items():
print(f" {key}: {value}")
# 处理属性请求响应
elif topic.startswith('v1/devices/me/attributes/response/'):
request_id = topic.split('/')[-1]
print(f"=== 属性请求响应 (ID: {request_id}) ===")
if 'client' in data:
print("客户端属性:")
for key, value in data['client'].items():
print(f" {key}: {value}")
if 'shared' in data:
print("共享属性:")
for key, value in data['shared'].items():
print(f" {key}: {value}")
if 'server' in data:
print("服务器属性:")
for key, value in data['server'].items():
print(f" {key}: {value}")
# 处理 RPC 请求
elif topic.startswith('v1/devices/me/rpc/request/'):
request_id = topic.split('/')[-1]
print(f"=== RPC 请求 (ID: {request_id}) ===")
method = data.get('method', '')
params = data.get('params', {})
print(f"方法: {method}")
print(f"参数: {params}")
# 处理 RPC 请求并回复
if method == 'getCurrentTime':
response = {
"time": "2025-01-20 10:30:00"
}
elif method == 'setGpio':
# 处理 GPIO 设置
pin = params.get('pin')
value = params.get('value')
print(f"设置 GPIO Pin {pin} = {value}")
response = {"success": True, "pin": pin, "value": value}
else:
response = {"error": f"Unknown method: {method}"}
# 发送 RPC 响应
response_topic = f'v1/devices/me/rpc/response/{request_id}'
client.publish(response_topic, json.dumps(response), qos=1)
print(f"已发送 RPC 响应到: {response_topic}")
except json.JSONDecodeError:
print(f"无法解析 JSON: {payload}")
def on_disconnect(client, userdata, rc):
"""断开连接回调"""
print("Disconnected from ThingsBoard")
# 创建 MQTT 客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 设置认证(使用 Access Token 作为用户名)
client.username_pw_set(ACCESS_TOKEN)
# 连接到 ThingsBoard
try:
client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
client.loop_forever()
except Exception as e:
print(f"连接失败: {e}")
请求设备属性(通过 MQTT)
设备可以通过发布请求消息来获取属性值:
# 请求客户端属性
request_id = 1
request = {"clientKeys": "model,version"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
# 请求共享属性
request_id = 2
request = {"sharedKeys": "config"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
# 请求所有属性
request_id = 3
request = {"clientKeys": "model", "sharedKeys": "config", "serverKeys": "active"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
响应会发布到 v1/devices/me/attributes/response/{request_id} 主题。
使用 v2 短主题(更高效)
# 使用 v2 短主题
client.subscribe('v2/a', qos=1) # 属性更新
client.subscribe('v2/a/res/+', qos=1) # 属性响应
client.subscribe('v2/r/req/+', qos=1) # RPC 请求
# 请求属性(v2 格式)
request_id = 1
request = {"clientKeys": "model"}
client.publish(f'v2/a/req/{request_id}', json.dumps(request), qos=1)
注意事项
-
遥测数据订阅:MQTT 协议中,遥测数据通常由设备发布给服务器,而不是服务器推送给设备。如果需要实时获取遥测数据,建议:
- 使用 WebSocket 订阅(见下方)
- 或通过 RPC 请求设备主动发送遥测数据
-
设备状态:设备状态(
active)存储在服务器属性中,可以通过属性请求获取:request = {"serverKeys": "active,lastConnectTime,lastActivityTime"} client.publish(f'v1/devices/me/attributes/request/1', json.dumps(request), qos=1) -
QoS 级别:推荐使用 QoS 1(至少一次),确保消息不丢失
-
连接认证:使用设备的 Access Token 作为 MQTT 用户名,密码留空
2. WebSocket 订阅(应用端,推荐)
ThingsBoard 支持通过 WebSocket 实时订阅设备遥测和属性的更新。适用于应用端(如 Web 应用、后端服务)查询和订阅设备数据。
WebSocket 连接:
ws://localhost:8080/api/ws/plugins/telemetry
认证方式:
- 使用 JWT Token 作为查询参数:
?token=$JWT_TOKEN - 或使用 Bearer Token 在连接头中
订阅命令格式:
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
JavaScript 示例:
// 连接 WebSocket
const token = 'YOUR_JWT_TOKEN';
const ws = new WebSocket(`ws://localhost:8080/api/ws/plugins/telemetry?token=${token}`);
ws.onopen = function() {
console.log('WebSocket connected');
// 订阅遥测数据
const subscribeCmd = {
tsSubCmds: [
{
entityType: 'DEVICE',
entityId: 'YOUR_DEVICE_ID',
scope: 'LATEST_TELEMETRY',
cmdId: 1
}
],
attrSubCmds: [
{
entityType: 'DEVICE',
entityId: 'YOUR_DEVICE_ID',
scope: 'SERVER_SCOPE',
cmdId: 2,
keys: 'active'
}
]
};
ws.send(JSON.stringify(subscribeCmd));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received update:', data);
// 处理遥测更新
if (data.subscriptionId === 1) {
console.log('Telemetry update:', data.data);
}
// 处理属性更新
if (data.subscriptionId === 2) {
console.log('Attribute update:', data.data);
}
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
};
ws.onclose = function() {
console.log('WebSocket closed');
};
Python 示例:
import asyncio
import websockets
import json
async def subscribe_to_device():
token = 'YOUR_JWT_TOKEN'
device_id = 'YOUR_DEVICE_ID'
uri = f'ws://localhost:8080/api/ws/plugins/telemetry?token={token}'
async with websockets.connect(uri) as websocket:
# 订阅命令
subscribe_cmd = {
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": device_id,
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": device_id,
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
await websocket.send(json.dumps(subscribe_cmd))
print("Subscribed to device updates")
# 接收更新
while True:
message = await websocket.recv()
data = json.loads(message)
if 'subscriptionId' in data:
if data['subscriptionId'] == 1:
print(f"Telemetry update: {data.get('data', {})}")
elif data['subscriptionId'] == 2:
print(f"Attribute update: {data.get('data', {})}")
# 运行
asyncio.run(subscribe_to_device())
2. 订阅特定键的遥测
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1,
"keys": "temperature,humidity"
}
]
}
3. 订阅多个设备
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId1}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1
},
{
"entityType": "DEVICE",
"entityId": "{deviceId2}",
"scope": "LATEST_TELEMETRY",
"cmdId": 2
}
]
}
使用示例
完整示例:查询和监控设备
Python 脚本示例:
import requests
import websockets
import json
import asyncio
from datetime import datetime
# 配置
THINGSBOARD_HOST = "localhost:8080"
JWT_TOKEN = "YOUR_JWT_TOKEN"
DEVICE_ID = "YOUR_DEVICE_ID"
def get_latest_telemetry():
"""获取最新遥测数据"""
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries"
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
def get_device_status():
"""获取设备状态"""
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/attributes/SERVER_SCOPE"
params = {"keys": "active,lastConnectTime,lastActivityTime"}
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
async def subscribe_to_device():
"""订阅设备实时更新"""
uri = f"ws://{THINGSBOARD_HOST}/api/ws/plugins/telemetry?token={JWT_TOKEN}"
async with websockets.connect(uri) as websocket:
# 订阅遥测和属性
subscribe_cmd = {
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": DEVICE_ID,
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": DEVICE_ID,
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
await websocket.send(json.dumps(subscribe_cmd))
print("已订阅设备更新,等待数据...")
while True:
message = await websocket.recv()
data = json.loads(message)
if 'subscriptionId' in data:
if data['subscriptionId'] == 1:
# 遥测更新
telemetry = data.get('data', {})
for key, values in telemetry.items():
if values:
value = values[0]
print(f"[{datetime.now()}] 遥测更新 - {key}: {value['value']} (时间: {value['ts']})")
elif data['subscriptionId'] == 2:
# 属性更新
attributes = data.get('data', {})
for attr in attributes:
print(f"[{datetime.now()}] 属性更新 - {attr['key']}: {attr['value']}")
if __name__ == "__main__":
# 查询当前状态
print("=== 查询当前遥测数据 ===")
telemetry = get_latest_telemetry()
if telemetry:
for key, values in telemetry.items():
if values:
print(f"{key}: {values[0]['value']} (时间: {values[0]['ts']})")
print("\n=== 查询设备状态 ===")
status = get_device_status()
if status:
for attr in status:
print(f"{attr['key']}: {attr['value']}")
print("\n=== 开始实时订阅 ===")
asyncio.run(subscribe_to_device())
curl 脚本示例:
#!/bin/bash
THINGSBOARD_HOST="localhost:8080"
JWT_TOKEN="YOUR_JWT_TOKEN"
DEVICE_ID="YOUR_DEVICE_ID"
echo "=== 获取最新遥测数据 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
echo -e "\n=== 获取设备状态 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
echo -e "\n=== 获取所有遥测键 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/keys/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
注意事项
- 认证: 所有 API 调用都需要有效的 JWT Token
- 权限: 确保用户有权限访问设备(
TENANT_ADMIN或CUSTOMER_USER) - 设备 ID: 使用设备的 UUID,不是设备名称
- 时间戳: 所有时间戳都是 UTC 时间,单位是毫秒
- WebSocket: 订阅是实时的,适合需要即时更新的场景
- 轮询 vs 订阅:
- 轮询(REST API):适合不频繁查询的场景
- 订阅(WebSocket):适合需要实时更新的场景,减少服务器负载
外部服务订阅方案(重要)
如果你的外部服务需要订阅设备发送给 ThingsBoard 的实时遥测数据,有以下几种方案:
方案 1:通过规则引擎转发到外部 MQTT Broker(推荐)
适用场景:外部服务可以连接到外部 MQTT Broker 进行订阅
实现步骤:
-
在 ThingsBoard 规则引擎中添加 MQTT 节点
- 进入 ThingsBoard Web UI
- 打开规则链(Rule Chain)编辑器
- 添加 "MQTT" 节点(External → MQTT)
- 配置外部 MQTT Broker 连接信息
-
配置 MQTT 节点
{ "host": "external-mqtt-broker.example.com", "port": 1883, "topicPattern": "thingsboard/telemetry/${deviceName}", "clientId": "tb-forwarder", "cleanSession": true, "retainedMessage": false, "ssl": false, "credentials": { "type": "anonymous" } } -
连接规则链
[设备遥测数据] → [MQTT 节点] → [外部 MQTT Broker] ↓ [外部服务订阅] -
外部服务订阅
import paho.mqtt.client as mqtt def on_connect(client, userdata, flags, rc): client.subscribe("thingsboard/telemetry/+") def on_message(client, userdata, msg): print(f"收到设备数据: {msg.topic} - {msg.payload}") client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("external-mqtt-broker.example.com", 1883, 60) client.loop_forever()
优势:
- ✅ 实时转发,延迟低
- ✅ 支持多个外部服务订阅同一主题
- ✅ 解耦 ThingsBoard 和外部服务
- ✅ 支持 QoS、认证等 MQTT 特性
方案 2:通过规则引擎转发到 Kafka
适用场景:外部服务可以连接 Kafka 进行消费
实现步骤:
-
在规则引擎中添加 Kafka 节点
- 添加 "Kafka" 节点(External → Kafka)
- 配置 Kafka Broker 信息
-
配置 Kafka 节点
{ "bootstrapServers": "kafka-broker:9092", "topicPattern": "thingsboard-telemetry", "acks": "1", "retries": 3, "batchSize": 16384, "linger": 5, "bufferMemory": 33554432 } -
外部服务消费 Kafka
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'thingsboard-telemetry', bootstrap_servers=['kafka-broker:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: print(f"收到设备数据: {message.value}")
方案 3:通过规则引擎调用 HTTP 接口
适用场景:外部服务提供 HTTP Webhook 接口
实现步骤:
-
在规则引擎中添加 REST API Call 节点
- 添加 "REST API Call" 节点(External → REST API Call)
- 配置外部服务的 HTTP 端点
-
配置 REST API Call 节点
{ "restEndpointUrlPattern": "http://external-service:8080/api/telemetry", "requestMethod": "POST", "headers": { "Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY" } } -
外部服务接收数据
from flask import Flask, request app = Flask(__name__) @app.route('/api/telemetry', methods=['POST']) def receive_telemetry(): data = request.json print(f"收到设备数据: {data}") # 处理数据... return {"status": "ok"}, 200 app.run(host='0.0.0.0', port=8080)
方案 4:直接订阅 ThingsBoard 内部 Kafka(如果使用 Kafka 队列)
适用场景:ThingsBoard 使用 Kafka 作为消息队列,且外部服务可以访问同一 Kafka 集群
实现步骤:
-
确认 ThingsBoard 使用的 Kafka 主题
- ThingsBoard 内部使用以下主题:
tb.core.*- 核心服务消息tb.rule-engine.*- 规则引擎消息tb.transport.*- 传输层消息
- ThingsBoard 内部使用以下主题:
-
外部服务订阅 Kafka
from kafka import KafkaConsumer import json # 注意:需要了解 ThingsBoard 内部消息格式(Protobuf) consumer = KafkaConsumer( 'tb.core.telemetry', # 示例主题,需要根据实际情况调整 bootstrap_servers=['kafka:9092'], group_id='external-service' ) for message in consumer: # ThingsBoard 使用 Protobuf 格式,需要解析 print(f"收到消息: {message.value}")
注意:此方案需要了解 ThingsBoard 内部消息格式,不推荐。
方案对比
| 方案 | 实时性 | 复杂度 | 推荐度 | 适用场景 |
|---|---|---|---|---|
| 外部 MQTT Broker | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 最适合 IoT 场景 |
| Kafka | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 高吞吐量场景 |
| HTTP Webhook | ⭐⭐⭐ | ⭐ | ⭐⭐⭐ | 简单集成场景 |
| 直接订阅 Kafka | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | 不推荐 |
推荐配置示例:外部 MQTT Broker
完整配置流程:
-
部署外部 MQTT Broker(如 Mosquitto)
docker run -it -p 1883:1883 -p 9001:9001 \ -v mosquitto.conf:/mosquitto/config/mosquitto.conf \ eclipse-mosquitto -
在 ThingsBoard 规则链中配置
- 规则链:
Root Rule Chain或设备特定的规则链 - 节点连接:
Message Type Switch→POST_TELEMETRY_REQUEST→MQTT Node
- 规则链:
-
MQTT 节点配置
{ "host": "192.168.1.100", "port": 1883, "topicPattern": "devices/${deviceName}/telemetry", "clientId": "tb-forwarder-${serviceId}", "cleanSession": true, "retainedMessage": false, "ssl": false, "credentials": { "type": "anonymous" } } -
外部服务订阅
import paho.mqtt.client as mqtt import json def on_connect(client, userdata, flags, rc): # 订阅所有设备的遥测数据 client.subscribe("devices/+/telemetry", qos=1) print("已订阅设备遥测数据") def on_message(client, userdata, msg): topic_parts = msg.topic.split('/') device_name = topic_parts[1] telemetry = json.loads(msg.payload) print(f"设备: {device_name}") print(f"数据: {telemetry}") # 处理遥测数据... client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("192.168.1.100", 1883, 60) client.loop_forever()