1021 lines
29 KiB
Markdown
1021 lines
29 KiB
Markdown
# ThingsBoard 设备状态和遥测数据查询订阅指南
|
||
|
||
## 目录
|
||
|
||
1. [查询设备遥测数据](#查询设备遥测数据)
|
||
2. [查询设备属性](#查询设备属性)
|
||
3. [查询设备状态](#查询设备状态)
|
||
4. [订阅实时更新](#订阅实时更新)
|
||
5. [使用示例](#使用示例)
|
||
|
||
---
|
||
|
||
## 查询设备遥测数据
|
||
|
||
### 1. 获取最新的遥测值(推荐)
|
||
|
||
**REST API 端点:**
|
||
```
|
||
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries
|
||
```
|
||
|
||
**参数说明:**
|
||
- `entityType`: 实体类型,通常是 `DEVICE`
|
||
- `entityId`: 设备 ID(UUID)
|
||
- `keys` (可选): 遥测键名,多个用逗号分隔,如 `temperature,humidity`。不指定则返回所有键
|
||
- `useStrictDataTypes` (可选): 是否使用严格数据类型,默认 `false`(值转为字符串)
|
||
|
||
**示例:**
|
||
|
||
```bash
|
||
# 获取所有最新遥测值
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
|
||
# 获取指定键的最新遥测值
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature,humidity" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
|
||
# 使用严格数据类型(返回原始类型)
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&useStrictDataTypes=true" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
**响应格式(默认,字符串类型):**
|
||
```json
|
||
{
|
||
"temperature": [
|
||
{
|
||
"ts": 1705732800000,
|
||
"value": "25.5"
|
||
}
|
||
],
|
||
"humidity": [
|
||
{
|
||
"ts": 1705732800000,
|
||
"value": "60.2"
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
**响应格式(useStrictDataTypes=true,原始类型):**
|
||
```json
|
||
{
|
||
"temperature": [
|
||
{
|
||
"ts": 1705732800000,
|
||
"value": 25.5
|
||
}
|
||
],
|
||
"humidity": [
|
||
{
|
||
"ts": 1705732800000,
|
||
"value": 60.2
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
### 2. 获取历史遥测数据
|
||
|
||
**REST API 端点:**
|
||
```
|
||
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries?keys={keys}&startTs={startTs}&endTs={endTs}
|
||
```
|
||
|
||
**参数说明:**
|
||
- `keys`: 遥测键名,多个用逗号分隔
|
||
- `startTs`: 开始时间戳(毫秒,UTC)
|
||
- `endTs`: 结束时间戳(毫秒,UTC)
|
||
- `interval` (可选): 聚合间隔(毫秒),默认 0(不聚合)
|
||
- `intervalType` (可选): 聚合类型,如 `MILLISECONDS`, `SECONDS`, `MINUTES`, `HOURS`, `DAYS`, `WEEKS`, `MONTHS`
|
||
- `agg` (可选): 聚合函数,如 `MIN`, `MAX`, `AVG`, `SUM`, `COUNT`
|
||
- `limit` (可选): 最大返回数据点数,默认 100
|
||
- `orderBy` (可选): 排序方式,`ASC` 或 `DESC`,默认 `DESC`
|
||
|
||
**示例:**
|
||
|
||
```bash
|
||
# 获取最近1小时的数据
|
||
START_TS=$(($(date +%s) * 1000 - 3600000))
|
||
END_TS=$(($(date +%s) * 1000))
|
||
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&limit=100" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
|
||
# 获取聚合数据(每小时平均值)
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&interval=3600000&agg=AVG" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
**响应格式:**
|
||
```json
|
||
[
|
||
{
|
||
"ts": 1705732800000,
|
||
"value": "25.5"
|
||
},
|
||
{
|
||
"ts": 1705736400000,
|
||
"value": "26.0"
|
||
}
|
||
]
|
||
```
|
||
|
||
### 3. 获取所有遥测键名
|
||
|
||
```bash
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/keys/timeseries" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
**响应格式:**
|
||
```json
|
||
["temperature", "humidity", "pressure", "batteryLevel"]
|
||
```
|
||
|
||
---
|
||
|
||
## 查询设备属性
|
||
|
||
### 1. 获取设备属性值
|
||
|
||
**REST API 端点:**
|
||
```
|
||
GET /api/plugins/telemetry/{entityType}/{entityId}/values/attributes
|
||
```
|
||
|
||
**参数说明:**
|
||
- `keys` (可选): 属性键名,多个用逗号分隔
|
||
|
||
**示例:**
|
||
|
||
```bash
|
||
# 获取所有属性
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
|
||
# 获取指定属性
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes?keys=model,firmware" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
### 2. 按作用域获取属性
|
||
|
||
属性分为三种作用域:
|
||
- `SERVER_SCOPE`: 服务器端属性(ThingsBoard 管理)
|
||
- `SHARED_SCOPE`: 共享属性(服务器和客户端共享)
|
||
- `CLIENT_SCOPE`: 客户端属性(设备端管理)
|
||
|
||
```bash
|
||
# 获取服务器端属性
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
|
||
# 获取客户端属性
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/CLIENT_SCOPE" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
---
|
||
|
||
## 查询设备状态
|
||
|
||
### 1. 设备在线状态
|
||
|
||
设备状态存储在 `SERVER_SCOPE` 属性中,键名为 `active`。
|
||
|
||
```bash
|
||
# 查询设备是否在线
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
**响应格式:**
|
||
```json
|
||
[
|
||
{
|
||
"key": "active",
|
||
"value": true,
|
||
"lastUpdateTs": 1705732800000
|
||
}
|
||
]
|
||
```
|
||
|
||
### 2. 设备连接信息
|
||
|
||
设备状态还包括以下信息(存储在 SERVER_SCOPE 属性或遥测中):
|
||
- `lastConnectTime`: 最后连接时间
|
||
- `lastDisconnectTime`: 最后断开时间
|
||
- `lastActivityTime`: 最后活动时间
|
||
|
||
```bash
|
||
# 查询设备连接信息
|
||
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active,lastConnectTime,lastActivityTime" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN"
|
||
```
|
||
|
||
---
|
||
|
||
## 订阅实时更新
|
||
|
||
### 1. MQTT 订阅(设备端)
|
||
|
||
ThingsBoard 支持通过 MQTT 协议订阅设备属性更新和 RPC 请求。**注意:** MQTT 主要用于设备端接收服务器推送的属性更新,遥测数据通常由设备发布给服务器。
|
||
|
||
#### 可订阅的 MQTT 主题
|
||
|
||
**v1 主题格式:**
|
||
|
||
| 主题 | 说明 | 用途 |
|
||
|------|------|------|
|
||
| `v1/devices/me/attributes` | 属性更新 | 订阅服务器推送给设备的属性更新(共享属性和服务器属性) |
|
||
| `v1/devices/me/attributes/response/+` | 属性请求响应 | 订阅属性请求的响应(`+` 是请求 ID) |
|
||
| `v1/devices/me/rpc/request/+` | RPC 请求 | 订阅服务器发送的 RPC 请求(`+` 是请求 ID) |
|
||
| `v1/devices/me/rpc/response/+` | RPC 响应 | 订阅 RPC 响应的主题(用于客户端 RPC) |
|
||
|
||
**v2 短主题格式(更高效):**
|
||
|
||
| 主题 | 说明 | 对应 v1 主题 |
|
||
|------|------|-------------|
|
||
| `v2/a` | 属性更新(短格式) | `v1/devices/me/attributes` |
|
||
| `v2/a/res/+` | 属性响应(短格式) | `v1/devices/me/attributes/response/+` |
|
||
| `v2/r/req/+` | RPC 请求(短格式) | `v1/devices/me/rpc/request/+` |
|
||
| `v2/r/res/+` | RPC 响应(短格式) | `v1/devices/me/rpc/response/+` |
|
||
|
||
#### Python MQTT 订阅示例
|
||
|
||
```python
|
||
import paho.mqtt.client as mqtt
|
||
import json
|
||
|
||
# MQTT 连接配置
|
||
THINGSBOARD_HOST = "localhost"
|
||
THINGSBOARD_PORT = 1883
|
||
ACCESS_TOKEN = "YOUR_DEVICE_ACCESS_TOKEN"
|
||
|
||
def on_connect(client, userdata, flags, rc):
|
||
"""连接成功回调"""
|
||
if rc == 0:
|
||
print("Connected to ThingsBoard")
|
||
|
||
# 订阅属性更新
|
||
client.subscribe('v1/devices/me/attributes', qos=1)
|
||
print("Subscribed to: v1/devices/me/attributes")
|
||
|
||
# 订阅属性请求响应
|
||
client.subscribe('v1/devices/me/attributes/response/+', qos=1)
|
||
print("Subscribed to: v1/devices/me/attributes/response/+")
|
||
|
||
# 订阅 RPC 请求
|
||
client.subscribe('v1/devices/me/rpc/request/+', qos=1)
|
||
print("Subscribed to: v1/devices/me/rpc/request/+")
|
||
|
||
# 请求服务器端属性(可选)
|
||
request_id = 1
|
||
request = {"keys": "active,lastConnectTime"}
|
||
client.publish(
|
||
f'v1/devices/me/attributes/request/{request_id}',
|
||
json.dumps(request),
|
||
qos=1
|
||
)
|
||
print(f"Requested attributes: {request}")
|
||
else:
|
||
print(f"Failed to connect, return code {rc}")
|
||
|
||
def on_message(client, userdata, msg):
|
||
"""消息接收回调"""
|
||
topic = msg.topic
|
||
payload = msg.payload.decode('utf-8')
|
||
|
||
print(f"\n收到消息 - 主题: {topic}")
|
||
print(f"消息内容: {payload}")
|
||
|
||
try:
|
||
data = json.loads(payload)
|
||
|
||
# 处理属性更新
|
||
if topic == 'v1/devices/me/attributes':
|
||
print("=== 属性更新 ===")
|
||
for key, value in data.items():
|
||
print(f" {key}: {value}")
|
||
|
||
# 处理属性请求响应
|
||
elif topic.startswith('v1/devices/me/attributes/response/'):
|
||
request_id = topic.split('/')[-1]
|
||
print(f"=== 属性请求响应 (ID: {request_id}) ===")
|
||
if 'client' in data:
|
||
print("客户端属性:")
|
||
for key, value in data['client'].items():
|
||
print(f" {key}: {value}")
|
||
if 'shared' in data:
|
||
print("共享属性:")
|
||
for key, value in data['shared'].items():
|
||
print(f" {key}: {value}")
|
||
if 'server' in data:
|
||
print("服务器属性:")
|
||
for key, value in data['server'].items():
|
||
print(f" {key}: {value}")
|
||
|
||
# 处理 RPC 请求
|
||
elif topic.startswith('v1/devices/me/rpc/request/'):
|
||
request_id = topic.split('/')[-1]
|
||
print(f"=== RPC 请求 (ID: {request_id}) ===")
|
||
method = data.get('method', '')
|
||
params = data.get('params', {})
|
||
print(f"方法: {method}")
|
||
print(f"参数: {params}")
|
||
|
||
# 处理 RPC 请求并回复
|
||
if method == 'getCurrentTime':
|
||
response = {
|
||
"time": "2025-01-20 10:30:00"
|
||
}
|
||
elif method == 'setGpio':
|
||
# 处理 GPIO 设置
|
||
pin = params.get('pin')
|
||
value = params.get('value')
|
||
print(f"设置 GPIO Pin {pin} = {value}")
|
||
response = {"success": True, "pin": pin, "value": value}
|
||
else:
|
||
response = {"error": f"Unknown method: {method}"}
|
||
|
||
# 发送 RPC 响应
|
||
response_topic = f'v1/devices/me/rpc/response/{request_id}'
|
||
client.publish(response_topic, json.dumps(response), qos=1)
|
||
print(f"已发送 RPC 响应到: {response_topic}")
|
||
|
||
except json.JSONDecodeError:
|
||
print(f"无法解析 JSON: {payload}")
|
||
|
||
def on_disconnect(client, userdata, rc):
|
||
"""断开连接回调"""
|
||
print("Disconnected from ThingsBoard")
|
||
|
||
# 创建 MQTT 客户端
|
||
client = mqtt.Client()
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
client.on_disconnect = on_disconnect
|
||
|
||
# 设置认证(使用 Access Token 作为用户名)
|
||
client.username_pw_set(ACCESS_TOKEN)
|
||
|
||
# 连接到 ThingsBoard
|
||
try:
|
||
client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
|
||
client.loop_forever()
|
||
except Exception as e:
|
||
print(f"连接失败: {e}")
|
||
```
|
||
|
||
#### 请求设备属性(通过 MQTT)
|
||
|
||
设备可以通过发布请求消息来获取属性值:
|
||
|
||
```python
|
||
# 请求客户端属性
|
||
request_id = 1
|
||
request = {"clientKeys": "model,version"}
|
||
client.publish(
|
||
f'v1/devices/me/attributes/request/{request_id}',
|
||
json.dumps(request),
|
||
qos=1
|
||
)
|
||
|
||
# 请求共享属性
|
||
request_id = 2
|
||
request = {"sharedKeys": "config"}
|
||
client.publish(
|
||
f'v1/devices/me/attributes/request/{request_id}',
|
||
json.dumps(request),
|
||
qos=1
|
||
)
|
||
|
||
# 请求所有属性
|
||
request_id = 3
|
||
request = {"clientKeys": "model", "sharedKeys": "config", "serverKeys": "active"}
|
||
client.publish(
|
||
f'v1/devices/me/attributes/request/{request_id}',
|
||
json.dumps(request),
|
||
qos=1
|
||
)
|
||
```
|
||
|
||
响应会发布到 `v1/devices/me/attributes/response/{request_id}` 主题。
|
||
|
||
#### 使用 v2 短主题(更高效)
|
||
|
||
```python
|
||
# 使用 v2 短主题
|
||
client.subscribe('v2/a', qos=1) # 属性更新
|
||
client.subscribe('v2/a/res/+', qos=1) # 属性响应
|
||
client.subscribe('v2/r/req/+', qos=1) # RPC 请求
|
||
|
||
# 请求属性(v2 格式)
|
||
request_id = 1
|
||
request = {"clientKeys": "model"}
|
||
client.publish(f'v2/a/req/{request_id}', json.dumps(request), qos=1)
|
||
```
|
||
|
||
#### 注意事项
|
||
|
||
1. **遥测数据订阅**:MQTT 协议中,遥测数据通常由设备发布给服务器,而不是服务器推送给设备。如果需要实时获取遥测数据,建议:
|
||
- 使用 WebSocket 订阅(见下方)
|
||
- 或通过 RPC 请求设备主动发送遥测数据
|
||
|
||
2. **设备状态**:设备状态(`active`)存储在服务器属性中,可以通过属性请求获取:
|
||
```python
|
||
request = {"serverKeys": "active,lastConnectTime,lastActivityTime"}
|
||
client.publish(f'v1/devices/me/attributes/request/1', json.dumps(request), qos=1)
|
||
```
|
||
|
||
3. **QoS 级别**:推荐使用 QoS 1(至少一次),确保消息不丢失
|
||
|
||
4. **连接认证**:使用设备的 Access Token 作为 MQTT 用户名,密码留空
|
||
|
||
---
|
||
|
||
### 2. WebSocket 订阅(应用端,推荐)
|
||
|
||
ThingsBoard 支持通过 WebSocket 实时订阅设备遥测和属性的更新。**适用于应用端(如 Web 应用、后端服务)查询和订阅设备数据。**
|
||
|
||
**WebSocket 连接:**
|
||
```
|
||
ws://localhost:8080/api/ws/plugins/telemetry
|
||
```
|
||
|
||
**认证方式:**
|
||
- 使用 JWT Token 作为查询参数:`?token=$JWT_TOKEN`
|
||
- 或使用 Bearer Token 在连接头中
|
||
|
||
**订阅命令格式:**
|
||
|
||
```json
|
||
{
|
||
"tsSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": "{deviceId}",
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 1
|
||
}
|
||
],
|
||
"attrSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": "{deviceId}",
|
||
"scope": "SERVER_SCOPE",
|
||
"cmdId": 2,
|
||
"keys": "active"
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
**JavaScript 示例:**
|
||
|
||
```javascript
|
||
// 连接 WebSocket
|
||
const token = 'YOUR_JWT_TOKEN';
|
||
const ws = new WebSocket(`ws://localhost:8080/api/ws/plugins/telemetry?token=${token}`);
|
||
|
||
ws.onopen = function() {
|
||
console.log('WebSocket connected');
|
||
|
||
// 订阅遥测数据
|
||
const subscribeCmd = {
|
||
tsSubCmds: [
|
||
{
|
||
entityType: 'DEVICE',
|
||
entityId: 'YOUR_DEVICE_ID',
|
||
scope: 'LATEST_TELEMETRY',
|
||
cmdId: 1
|
||
}
|
||
],
|
||
attrSubCmds: [
|
||
{
|
||
entityType: 'DEVICE',
|
||
entityId: 'YOUR_DEVICE_ID',
|
||
scope: 'SERVER_SCOPE',
|
||
cmdId: 2,
|
||
keys: 'active'
|
||
}
|
||
]
|
||
};
|
||
|
||
ws.send(JSON.stringify(subscribeCmd));
|
||
};
|
||
|
||
ws.onmessage = function(event) {
|
||
const data = JSON.parse(event.data);
|
||
console.log('Received update:', data);
|
||
|
||
// 处理遥测更新
|
||
if (data.subscriptionId === 1) {
|
||
console.log('Telemetry update:', data.data);
|
||
}
|
||
|
||
// 处理属性更新
|
||
if (data.subscriptionId === 2) {
|
||
console.log('Attribute update:', data.data);
|
||
}
|
||
};
|
||
|
||
ws.onerror = function(error) {
|
||
console.error('WebSocket error:', error);
|
||
};
|
||
|
||
ws.onclose = function() {
|
||
console.log('WebSocket closed');
|
||
};
|
||
```
|
||
|
||
**Python 示例:**
|
||
|
||
```python
|
||
import asyncio
|
||
import websockets
|
||
import json
|
||
|
||
async def subscribe_to_device():
|
||
token = 'YOUR_JWT_TOKEN'
|
||
device_id = 'YOUR_DEVICE_ID'
|
||
|
||
uri = f'ws://localhost:8080/api/ws/plugins/telemetry?token={token}'
|
||
|
||
async with websockets.connect(uri) as websocket:
|
||
# 订阅命令
|
||
subscribe_cmd = {
|
||
"tsSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": device_id,
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 1
|
||
}
|
||
],
|
||
"attrSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": device_id,
|
||
"scope": "SERVER_SCOPE",
|
||
"cmdId": 2,
|
||
"keys": "active"
|
||
}
|
||
]
|
||
}
|
||
|
||
await websocket.send(json.dumps(subscribe_cmd))
|
||
print("Subscribed to device updates")
|
||
|
||
# 接收更新
|
||
while True:
|
||
message = await websocket.recv()
|
||
data = json.loads(message)
|
||
|
||
if 'subscriptionId' in data:
|
||
if data['subscriptionId'] == 1:
|
||
print(f"Telemetry update: {data.get('data', {})}")
|
||
elif data['subscriptionId'] == 2:
|
||
print(f"Attribute update: {data.get('data', {})}")
|
||
|
||
# 运行
|
||
asyncio.run(subscribe_to_device())
|
||
```
|
||
|
||
### 2. 订阅特定键的遥测
|
||
|
||
```json
|
||
{
|
||
"tsSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": "{deviceId}",
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 1,
|
||
"keys": "temperature,humidity"
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
### 3. 订阅多个设备
|
||
|
||
```json
|
||
{
|
||
"tsSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": "{deviceId1}",
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 1
|
||
},
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": "{deviceId2}",
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 2
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 使用示例
|
||
|
||
### 完整示例:查询和监控设备
|
||
|
||
**Python 脚本示例:**
|
||
|
||
```python
|
||
import requests
|
||
import websockets
|
||
import json
|
||
import asyncio
|
||
from datetime import datetime
|
||
|
||
# 配置
|
||
THINGSBOARD_HOST = "localhost:8080"
|
||
JWT_TOKEN = "YOUR_JWT_TOKEN"
|
||
DEVICE_ID = "YOUR_DEVICE_ID"
|
||
|
||
def get_latest_telemetry():
|
||
"""获取最新遥测数据"""
|
||
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries"
|
||
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
|
||
|
||
response = requests.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f"Error: {response.status_code}")
|
||
return None
|
||
|
||
def get_device_status():
|
||
"""获取设备状态"""
|
||
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/attributes/SERVER_SCOPE"
|
||
params = {"keys": "active,lastConnectTime,lastActivityTime"}
|
||
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
|
||
|
||
response = requests.get(url, headers=headers, params=params)
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
else:
|
||
print(f"Error: {response.status_code}")
|
||
return None
|
||
|
||
async def subscribe_to_device():
|
||
"""订阅设备实时更新"""
|
||
uri = f"ws://{THINGSBOARD_HOST}/api/ws/plugins/telemetry?token={JWT_TOKEN}"
|
||
|
||
async with websockets.connect(uri) as websocket:
|
||
# 订阅遥测和属性
|
||
subscribe_cmd = {
|
||
"tsSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": DEVICE_ID,
|
||
"scope": "LATEST_TELEMETRY",
|
||
"cmdId": 1
|
||
}
|
||
],
|
||
"attrSubCmds": [
|
||
{
|
||
"entityType": "DEVICE",
|
||
"entityId": DEVICE_ID,
|
||
"scope": "SERVER_SCOPE",
|
||
"cmdId": 2,
|
||
"keys": "active"
|
||
}
|
||
]
|
||
}
|
||
|
||
await websocket.send(json.dumps(subscribe_cmd))
|
||
print("已订阅设备更新,等待数据...")
|
||
|
||
while True:
|
||
message = await websocket.recv()
|
||
data = json.loads(message)
|
||
|
||
if 'subscriptionId' in data:
|
||
if data['subscriptionId'] == 1:
|
||
# 遥测更新
|
||
telemetry = data.get('data', {})
|
||
for key, values in telemetry.items():
|
||
if values:
|
||
value = values[0]
|
||
print(f"[{datetime.now()}] 遥测更新 - {key}: {value['value']} (时间: {value['ts']})")
|
||
elif data['subscriptionId'] == 2:
|
||
# 属性更新
|
||
attributes = data.get('data', {})
|
||
for attr in attributes:
|
||
print(f"[{datetime.now()}] 属性更新 - {attr['key']}: {attr['value']}")
|
||
|
||
if __name__ == "__main__":
|
||
# 查询当前状态
|
||
print("=== 查询当前遥测数据 ===")
|
||
telemetry = get_latest_telemetry()
|
||
if telemetry:
|
||
for key, values in telemetry.items():
|
||
if values:
|
||
print(f"{key}: {values[0]['value']} (时间: {values[0]['ts']})")
|
||
|
||
print("\n=== 查询设备状态 ===")
|
||
status = get_device_status()
|
||
if status:
|
||
for attr in status:
|
||
print(f"{attr['key']}: {attr['value']}")
|
||
|
||
print("\n=== 开始实时订阅 ===")
|
||
asyncio.run(subscribe_to_device())
|
||
```
|
||
|
||
**curl 脚本示例:**
|
||
|
||
```bash
|
||
#!/bin/bash
|
||
|
||
THINGSBOARD_HOST="localhost:8080"
|
||
JWT_TOKEN="YOUR_JWT_TOKEN"
|
||
DEVICE_ID="YOUR_DEVICE_ID"
|
||
|
||
echo "=== 获取最新遥测数据 ==="
|
||
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/timeseries" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
|
||
|
||
echo -e "\n=== 获取设备状态 ==="
|
||
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/attributes/SERVER_SCOPE?keys=active" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
|
||
|
||
echo -e "\n=== 获取所有遥测键 ==="
|
||
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/keys/timeseries" \
|
||
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
|
||
```
|
||
|
||
---
|
||
|
||
## 注意事项
|
||
|
||
1. **认证:** 所有 API 调用都需要有效的 JWT Token
|
||
2. **权限:** 确保用户有权限访问设备(`TENANT_ADMIN` 或 `CUSTOMER_USER`)
|
||
3. **设备 ID:** 使用设备的 UUID,不是设备名称
|
||
4. **时间戳:** 所有时间戳都是 UTC 时间,单位是毫秒
|
||
5. **WebSocket:** 订阅是实时的,适合需要即时更新的场景
|
||
6. **轮询 vs 订阅:**
|
||
- 轮询(REST API):适合不频繁查询的场景
|
||
- 订阅(WebSocket):适合需要实时更新的场景,减少服务器负载
|
||
|
||
---
|
||
|
||
---
|
||
|
||
## 外部服务订阅方案(重要)
|
||
|
||
如果你的外部服务需要订阅设备发送给 ThingsBoard 的实时遥测数据,有以下几种方案:
|
||
|
||
### 方案 1:通过规则引擎转发到外部 MQTT Broker(推荐)
|
||
|
||
**适用场景**:外部服务可以连接到外部 MQTT Broker 进行订阅
|
||
|
||
**实现步骤**:
|
||
|
||
1. **在 ThingsBoard 规则引擎中添加 MQTT 节点**
|
||
- 进入 ThingsBoard Web UI
|
||
- 打开规则链(Rule Chain)编辑器
|
||
- 添加 "MQTT" 节点(External → MQTT)
|
||
- 配置外部 MQTT Broker 连接信息
|
||
|
||
2. **配置 MQTT 节点**
|
||
```json
|
||
{
|
||
"host": "external-mqtt-broker.example.com",
|
||
"port": 1883,
|
||
"topicPattern": "thingsboard/telemetry/${deviceName}",
|
||
"clientId": "tb-forwarder",
|
||
"cleanSession": true,
|
||
"retainedMessage": false,
|
||
"ssl": false,
|
||
"credentials": {
|
||
"type": "anonymous"
|
||
}
|
||
}
|
||
```
|
||
|
||
3. **连接规则链**
|
||
```
|
||
[设备遥测数据] → [MQTT 节点] → [外部 MQTT Broker]
|
||
↓
|
||
[外部服务订阅]
|
||
```
|
||
|
||
4. **外部服务订阅**
|
||
```python
|
||
import paho.mqtt.client as mqtt
|
||
|
||
def on_connect(client, userdata, flags, rc):
|
||
client.subscribe("thingsboard/telemetry/+")
|
||
|
||
def on_message(client, userdata, msg):
|
||
print(f"收到设备数据: {msg.topic} - {msg.payload}")
|
||
|
||
client = mqtt.Client()
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
client.connect("external-mqtt-broker.example.com", 1883, 60)
|
||
client.loop_forever()
|
||
```
|
||
|
||
**优势**:
|
||
- ✅ 实时转发,延迟低
|
||
- ✅ 支持多个外部服务订阅同一主题
|
||
- ✅ 解耦 ThingsBoard 和外部服务
|
||
- ✅ 支持 QoS、认证等 MQTT 特性
|
||
|
||
### 方案 2:通过规则引擎转发到 Kafka
|
||
|
||
**适用场景**:外部服务可以连接 Kafka 进行消费
|
||
|
||
**实现步骤**:
|
||
|
||
1. **在规则引擎中添加 Kafka 节点**
|
||
- 添加 "Kafka" 节点(External → Kafka)
|
||
- 配置 Kafka Broker 信息
|
||
|
||
2. **配置 Kafka 节点**
|
||
```json
|
||
{
|
||
"bootstrapServers": "kafka-broker:9092",
|
||
"topicPattern": "thingsboard-telemetry",
|
||
"acks": "1",
|
||
"retries": 3,
|
||
"batchSize": 16384,
|
||
"linger": 5,
|
||
"bufferMemory": 33554432
|
||
}
|
||
```
|
||
|
||
3. **外部服务消费 Kafka**
|
||
```python
|
||
from kafka import KafkaConsumer
|
||
import json
|
||
|
||
consumer = KafkaConsumer(
|
||
'thingsboard-telemetry',
|
||
bootstrap_servers=['kafka-broker:9092'],
|
||
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
|
||
)
|
||
|
||
for message in consumer:
|
||
print(f"收到设备数据: {message.value}")
|
||
```
|
||
|
||
### 方案 3:通过规则引擎调用 HTTP 接口
|
||
|
||
**适用场景**:外部服务提供 HTTP Webhook 接口
|
||
|
||
**实现步骤**:
|
||
|
||
1. **在规则引擎中添加 REST API Call 节点**
|
||
- 添加 "REST API Call" 节点(External → REST API Call)
|
||
- 配置外部服务的 HTTP 端点
|
||
|
||
2. **配置 REST API Call 节点**
|
||
```json
|
||
{
|
||
"restEndpointUrlPattern": "http://external-service:8080/api/telemetry",
|
||
"requestMethod": "POST",
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"Authorization": "Bearer YOUR_API_KEY"
|
||
}
|
||
}
|
||
```
|
||
|
||
3. **外部服务接收数据**
|
||
```python
|
||
from flask import Flask, request
|
||
|
||
app = Flask(__name__)
|
||
|
||
@app.route('/api/telemetry', methods=['POST'])
|
||
def receive_telemetry():
|
||
data = request.json
|
||
print(f"收到设备数据: {data}")
|
||
# 处理数据...
|
||
return {"status": "ok"}, 200
|
||
|
||
app.run(host='0.0.0.0', port=8080)
|
||
```
|
||
|
||
### 方案 4:直接订阅 ThingsBoard 内部 Kafka(如果使用 Kafka 队列)
|
||
|
||
**适用场景**:ThingsBoard 使用 Kafka 作为消息队列,且外部服务可以访问同一 Kafka 集群
|
||
|
||
**实现步骤**:
|
||
|
||
1. **确认 ThingsBoard 使用的 Kafka 主题**
|
||
- ThingsBoard 内部使用以下主题:
|
||
- `tb.core.*` - 核心服务消息
|
||
- `tb.rule-engine.*` - 规则引擎消息
|
||
- `tb.transport.*` - 传输层消息
|
||
|
||
2. **外部服务订阅 Kafka**
|
||
```python
|
||
from kafka import KafkaConsumer
|
||
import json
|
||
|
||
# 注意:需要了解 ThingsBoard 内部消息格式(Protobuf)
|
||
consumer = KafkaConsumer(
|
||
'tb.core.telemetry', # 示例主题,需要根据实际情况调整
|
||
bootstrap_servers=['kafka:9092'],
|
||
group_id='external-service'
|
||
)
|
||
|
||
for message in consumer:
|
||
# ThingsBoard 使用 Protobuf 格式,需要解析
|
||
print(f"收到消息: {message.value}")
|
||
```
|
||
|
||
**注意**:此方案需要了解 ThingsBoard 内部消息格式,不推荐。
|
||
|
||
### 方案对比
|
||
|
||
| 方案 | 实时性 | 复杂度 | 推荐度 | 适用场景 |
|
||
|------|--------|--------|--------|----------|
|
||
| **外部 MQTT Broker** | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 最适合 IoT 场景 |
|
||
| **Kafka** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 高吞吐量场景 |
|
||
| **HTTP Webhook** | ⭐⭐⭐ | ⭐ | ⭐⭐⭐ | 简单集成场景 |
|
||
| **直接订阅 Kafka** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | 不推荐 |
|
||
|
||
### 推荐配置示例:外部 MQTT Broker
|
||
|
||
**完整配置流程**:
|
||
|
||
1. **部署外部 MQTT Broker**(如 Mosquitto)
|
||
```bash
|
||
docker run -it -p 1883:1883 -p 9001:9001 \
|
||
-v mosquitto.conf:/mosquitto/config/mosquitto.conf \
|
||
eclipse-mosquitto
|
||
```
|
||
|
||
2. **在 ThingsBoard 规则链中配置**
|
||
- 规则链:`Root Rule Chain` 或设备特定的规则链
|
||
- 节点连接:`Message Type Switch` → `POST_TELEMETRY_REQUEST` → `MQTT Node`
|
||
|
||
3. **MQTT 节点配置**
|
||
```json
|
||
{
|
||
"host": "192.168.1.100",
|
||
"port": 1883,
|
||
"topicPattern": "devices/${deviceName}/telemetry",
|
||
"clientId": "tb-forwarder-${serviceId}",
|
||
"cleanSession": true,
|
||
"retainedMessage": false,
|
||
"ssl": false,
|
||
"credentials": {
|
||
"type": "anonymous"
|
||
}
|
||
}
|
||
```
|
||
|
||
4. **外部服务订阅**
|
||
```python
|
||
import paho.mqtt.client as mqtt
|
||
import json
|
||
|
||
def on_connect(client, userdata, flags, rc):
|
||
# 订阅所有设备的遥测数据
|
||
client.subscribe("devices/+/telemetry", qos=1)
|
||
print("已订阅设备遥测数据")
|
||
|
||
def on_message(client, userdata, msg):
|
||
topic_parts = msg.topic.split('/')
|
||
device_name = topic_parts[1]
|
||
telemetry = json.loads(msg.payload)
|
||
|
||
print(f"设备: {device_name}")
|
||
print(f"数据: {telemetry}")
|
||
|
||
# 处理遥测数据...
|
||
|
||
client = mqtt.Client()
|
||
client.on_connect = on_connect
|
||
client.on_message = on_message
|
||
client.connect("192.168.1.100", 1883, 60)
|
||
client.loop_forever()
|
||
```
|
||
|
||
---
|
||
|
||
## 相关文档
|
||
|
||
- [ThingsBoard REST API 文档](https://thingsboard.io/docs/reference/rest-api/)
|
||
- [ThingsBoard WebSocket API 文档](https://thingsboard.io/docs/user-guide/contribution/websocket-api/)
|
||
- [ThingsBoard MQTT API 文档](https://thingsboard.io/docs/reference/mqtt-api/)
|
||
- [ThingsBoard 规则引擎 MQTT 节点文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/)
|
||
- [ThingsBoard 规则引擎 Kafka 节点文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/kafka/)
|
||
|