thingsboard/summary/21-设备状态和遥测数据查询订阅指南.md

1021 lines
29 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ThingsBoard 设备状态和遥测数据查询订阅指南
## 目录
1. [查询设备遥测数据](#查询设备遥测数据)
2. [查询设备属性](#查询设备属性)
3. [查询设备状态](#查询设备状态)
4. [订阅实时更新](#订阅实时更新)
5. [使用示例](#使用示例)
---
## 查询设备遥测数据
### 1. 获取最新的遥测值(推荐)
**REST API 端点:**
```
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries
```
**参数说明:**
- `entityType`: 实体类型,通常是 `DEVICE`
- `entityId`: 设备 IDUUID
- `keys` (可选): 遥测键名,多个用逗号分隔,如 `temperature,humidity`。不指定则返回所有键
- `useStrictDataTypes` (可选): 是否使用严格数据类型,默认 `false`(值转为字符串)
**示例:**
```bash
# 获取所有最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取指定键的最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature,humidity" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 使用严格数据类型(返回原始类型)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&useStrictDataTypes=true" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
**响应格式(默认,字符串类型):**
```json
{
"temperature": [
{
"ts": 1705732800000,
"value": "25.5"
}
],
"humidity": [
{
"ts": 1705732800000,
"value": "60.2"
}
]
}
```
**响应格式useStrictDataTypes=true原始类型**
```json
{
"temperature": [
{
"ts": 1705732800000,
"value": 25.5
}
],
"humidity": [
{
"ts": 1705732800000,
"value": 60.2
}
]
}
```
### 2. 获取历史遥测数据
**REST API 端点:**
```
GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries?keys={keys}&startTs={startTs}&endTs={endTs}
```
**参数说明:**
- `keys`: 遥测键名,多个用逗号分隔
- `startTs`: 开始时间戳毫秒UTC
- `endTs`: 结束时间戳毫秒UTC
- `interval` (可选): 聚合间隔(毫秒),默认 0不聚合
- `intervalType` (可选): 聚合类型,如 `MILLISECONDS`, `SECONDS`, `MINUTES`, `HOURS`, `DAYS`, `WEEKS`, `MONTHS`
- `agg` (可选): 聚合函数,如 `MIN`, `MAX`, `AVG`, `SUM`, `COUNT`
- `limit` (可选): 最大返回数据点数,默认 100
- `orderBy` (可选): 排序方式,`ASC` 或 `DESC`,默认 `DESC`
**示例:**
```bash
# 获取最近1小时的数据
START_TS=$(($(date +%s) * 1000 - 3600000))
END_TS=$(($(date +%s) * 1000))
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&limit=100" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取聚合数据(每小时平均值)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&interval=3600000&agg=AVG" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
**响应格式:**
```json
[
{
"ts": 1705732800000,
"value": "25.5"
},
{
"ts": 1705736400000,
"value": "26.0"
}
]
```
### 3. 获取所有遥测键名
```bash
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/keys/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
**响应格式:**
```json
["temperature", "humidity", "pressure", "batteryLevel"]
```
---
## 查询设备属性
### 1. 获取设备属性值
**REST API 端点:**
```
GET /api/plugins/telemetry/{entityType}/{entityId}/values/attributes
```
**参数说明:**
- `keys` (可选): 属性键名,多个用逗号分隔
**示例:**
```bash
# 获取所有属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取指定属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes?keys=model,firmware" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
### 2. 按作用域获取属性
属性分为三种作用域:
- `SERVER_SCOPE`: 服务器端属性ThingsBoard 管理)
- `SHARED_SCOPE`: 共享属性(服务器和客户端共享)
- `CLIENT_SCOPE`: 客户端属性(设备端管理)
```bash
# 获取服务器端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN"
# 获取客户端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/CLIENT_SCOPE" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
---
## 查询设备状态
### 1. 设备在线状态
设备状态存储在 `SERVER_SCOPE` 属性中,键名为 `active`
```bash
# 查询设备是否在线
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
**响应格式:**
```json
[
{
"key": "active",
"value": true,
"lastUpdateTs": 1705732800000
}
]
```
### 2. 设备连接信息
设备状态还包括以下信息(存储在 SERVER_SCOPE 属性或遥测中):
- `lastConnectTime`: 最后连接时间
- `lastDisconnectTime`: 最后断开时间
- `lastActivityTime`: 最后活动时间
```bash
# 查询设备连接信息
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active,lastConnectTime,lastActivityTime" \
-H "X-Authorization: Bearer $JWT_TOKEN"
```
---
## 订阅实时更新
### 1. MQTT 订阅(设备端)
ThingsBoard 支持通过 MQTT 协议订阅设备属性更新和 RPC 请求。**注意:** MQTT 主要用于设备端接收服务器推送的属性更新,遥测数据通常由设备发布给服务器。
#### 可订阅的 MQTT 主题
**v1 主题格式:**
| 主题 | 说明 | 用途 |
|------|------|------|
| `v1/devices/me/attributes` | 属性更新 | 订阅服务器推送给设备的属性更新(共享属性和服务器属性) |
| `v1/devices/me/attributes/response/+` | 属性请求响应 | 订阅属性请求的响应(`+` 是请求 ID |
| `v1/devices/me/rpc/request/+` | RPC 请求 | 订阅服务器发送的 RPC 请求(`+` 是请求 ID |
| `v1/devices/me/rpc/response/+` | RPC 响应 | 订阅 RPC 响应的主题(用于客户端 RPC |
**v2 短主题格式(更高效):**
| 主题 | 说明 | 对应 v1 主题 |
|------|------|-------------|
| `v2/a` | 属性更新(短格式) | `v1/devices/me/attributes` |
| `v2/a/res/+` | 属性响应(短格式) | `v1/devices/me/attributes/response/+` |
| `v2/r/req/+` | RPC 请求(短格式) | `v1/devices/me/rpc/request/+` |
| `v2/r/res/+` | RPC 响应(短格式) | `v1/devices/me/rpc/response/+` |
#### Python MQTT 订阅示例
```python
import paho.mqtt.client as mqtt
import json
# MQTT 连接配置
THINGSBOARD_HOST = "localhost"
THINGSBOARD_PORT = 1883
ACCESS_TOKEN = "YOUR_DEVICE_ACCESS_TOKEN"
def on_connect(client, userdata, flags, rc):
"""连接成功回调"""
if rc == 0:
print("Connected to ThingsBoard")
# 订阅属性更新
client.subscribe('v1/devices/me/attributes', qos=1)
print("Subscribed to: v1/devices/me/attributes")
# 订阅属性请求响应
client.subscribe('v1/devices/me/attributes/response/+', qos=1)
print("Subscribed to: v1/devices/me/attributes/response/+")
# 订阅 RPC 请求
client.subscribe('v1/devices/me/rpc/request/+', qos=1)
print("Subscribed to: v1/devices/me/rpc/request/+")
# 请求服务器端属性(可选)
request_id = 1
request = {"keys": "active,lastConnectTime"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
print(f"Requested attributes: {request}")
else:
print(f"Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
"""消息接收回调"""
topic = msg.topic
payload = msg.payload.decode('utf-8')
print(f"\n收到消息 - 主题: {topic}")
print(f"消息内容: {payload}")
try:
data = json.loads(payload)
# 处理属性更新
if topic == 'v1/devices/me/attributes':
print("=== 属性更新 ===")
for key, value in data.items():
print(f" {key}: {value}")
# 处理属性请求响应
elif topic.startswith('v1/devices/me/attributes/response/'):
request_id = topic.split('/')[-1]
print(f"=== 属性请求响应 (ID: {request_id}) ===")
if 'client' in data:
print("客户端属性:")
for key, value in data['client'].items():
print(f" {key}: {value}")
if 'shared' in data:
print("共享属性:")
for key, value in data['shared'].items():
print(f" {key}: {value}")
if 'server' in data:
print("服务器属性:")
for key, value in data['server'].items():
print(f" {key}: {value}")
# 处理 RPC 请求
elif topic.startswith('v1/devices/me/rpc/request/'):
request_id = topic.split('/')[-1]
print(f"=== RPC 请求 (ID: {request_id}) ===")
method = data.get('method', '')
params = data.get('params', {})
print(f"方法: {method}")
print(f"参数: {params}")
# 处理 RPC 请求并回复
if method == 'getCurrentTime':
response = {
"time": "2025-01-20 10:30:00"
}
elif method == 'setGpio':
# 处理 GPIO 设置
pin = params.get('pin')
value = params.get('value')
print(f"设置 GPIO Pin {pin} = {value}")
response = {"success": True, "pin": pin, "value": value}
else:
response = {"error": f"Unknown method: {method}"}
# 发送 RPC 响应
response_topic = f'v1/devices/me/rpc/response/{request_id}'
client.publish(response_topic, json.dumps(response), qos=1)
print(f"已发送 RPC 响应到: {response_topic}")
except json.JSONDecodeError:
print(f"无法解析 JSON: {payload}")
def on_disconnect(client, userdata, rc):
"""断开连接回调"""
print("Disconnected from ThingsBoard")
# 创建 MQTT 客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 设置认证(使用 Access Token 作为用户名)
client.username_pw_set(ACCESS_TOKEN)
# 连接到 ThingsBoard
try:
client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
client.loop_forever()
except Exception as e:
print(f"连接失败: {e}")
```
#### 请求设备属性(通过 MQTT
设备可以通过发布请求消息来获取属性值:
```python
# 请求客户端属性
request_id = 1
request = {"clientKeys": "model,version"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
# 请求共享属性
request_id = 2
request = {"sharedKeys": "config"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
# 请求所有属性
request_id = 3
request = {"clientKeys": "model", "sharedKeys": "config", "serverKeys": "active"}
client.publish(
f'v1/devices/me/attributes/request/{request_id}',
json.dumps(request),
qos=1
)
```
响应会发布到 `v1/devices/me/attributes/response/{request_id}` 主题。
#### 使用 v2 短主题(更高效)
```python
# 使用 v2 短主题
client.subscribe('v2/a', qos=1) # 属性更新
client.subscribe('v2/a/res/+', qos=1) # 属性响应
client.subscribe('v2/r/req/+', qos=1) # RPC 请求
# 请求属性v2 格式)
request_id = 1
request = {"clientKeys": "model"}
client.publish(f'v2/a/req/{request_id}', json.dumps(request), qos=1)
```
#### 注意事项
1. **遥测数据订阅**MQTT 协议中,遥测数据通常由设备发布给服务器,而不是服务器推送给设备。如果需要实时获取遥测数据,建议:
- 使用 WebSocket 订阅(见下方)
- 或通过 RPC 请求设备主动发送遥测数据
2. **设备状态**:设备状态(`active`)存储在服务器属性中,可以通过属性请求获取:
```python
request = {"serverKeys": "active,lastConnectTime,lastActivityTime"}
client.publish(f'v1/devices/me/attributes/request/1', json.dumps(request), qos=1)
```
3. **QoS 级别**:推荐使用 QoS 1至少一次确保消息不丢失
4. **连接认证**:使用设备的 Access Token 作为 MQTT 用户名,密码留空
---
### 2. WebSocket 订阅(应用端,推荐)
ThingsBoard 支持通过 WebSocket 实时订阅设备遥测和属性的更新。**适用于应用端(如 Web 应用、后端服务)查询和订阅设备数据。**
**WebSocket 连接:**
```
ws://localhost:8080/api/ws/plugins/telemetry
```
**认证方式:**
- 使用 JWT Token 作为查询参数:`?token=$JWT_TOKEN`
- 或使用 Bearer Token 在连接头中
**订阅命令格式:**
```json
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
```
**JavaScript 示例:**
```javascript
// 连接 WebSocket
const token = 'YOUR_JWT_TOKEN';
const ws = new WebSocket(`ws://localhost:8080/api/ws/plugins/telemetry?token=${token}`);
ws.onopen = function() {
console.log('WebSocket connected');
// 订阅遥测数据
const subscribeCmd = {
tsSubCmds: [
{
entityType: 'DEVICE',
entityId: 'YOUR_DEVICE_ID',
scope: 'LATEST_TELEMETRY',
cmdId: 1
}
],
attrSubCmds: [
{
entityType: 'DEVICE',
entityId: 'YOUR_DEVICE_ID',
scope: 'SERVER_SCOPE',
cmdId: 2,
keys: 'active'
}
]
};
ws.send(JSON.stringify(subscribeCmd));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received update:', data);
// 处理遥测更新
if (data.subscriptionId === 1) {
console.log('Telemetry update:', data.data);
}
// 处理属性更新
if (data.subscriptionId === 2) {
console.log('Attribute update:', data.data);
}
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
};
ws.onclose = function() {
console.log('WebSocket closed');
};
```
**Python 示例:**
```python
import asyncio
import websockets
import json
async def subscribe_to_device():
token = 'YOUR_JWT_TOKEN'
device_id = 'YOUR_DEVICE_ID'
uri = f'ws://localhost:8080/api/ws/plugins/telemetry?token={token}'
async with websockets.connect(uri) as websocket:
# 订阅命令
subscribe_cmd = {
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": device_id,
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": device_id,
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
await websocket.send(json.dumps(subscribe_cmd))
print("Subscribed to device updates")
# 接收更新
while True:
message = await websocket.recv()
data = json.loads(message)
if 'subscriptionId' in data:
if data['subscriptionId'] == 1:
print(f"Telemetry update: {data.get('data', {})}")
elif data['subscriptionId'] == 2:
print(f"Attribute update: {data.get('data', {})}")
# 运行
asyncio.run(subscribe_to_device())
```
### 2. 订阅特定键的遥测
```json
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1,
"keys": "temperature,humidity"
}
]
}
```
### 3. 订阅多个设备
```json
{
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": "{deviceId1}",
"scope": "LATEST_TELEMETRY",
"cmdId": 1
},
{
"entityType": "DEVICE",
"entityId": "{deviceId2}",
"scope": "LATEST_TELEMETRY",
"cmdId": 2
}
]
}
```
---
## 使用示例
### 完整示例:查询和监控设备
**Python 脚本示例:**
```python
import requests
import websockets
import json
import asyncio
from datetime import datetime
# 配置
THINGSBOARD_HOST = "localhost:8080"
JWT_TOKEN = "YOUR_JWT_TOKEN"
DEVICE_ID = "YOUR_DEVICE_ID"
def get_latest_telemetry():
"""获取最新遥测数据"""
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries"
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
def get_device_status():
"""获取设备状态"""
url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/attributes/SERVER_SCOPE"
params = {"keys": "active,lastConnectTime,lastActivityTime"}
headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Error: {response.status_code}")
return None
async def subscribe_to_device():
"""订阅设备实时更新"""
uri = f"ws://{THINGSBOARD_HOST}/api/ws/plugins/telemetry?token={JWT_TOKEN}"
async with websockets.connect(uri) as websocket:
# 订阅遥测和属性
subscribe_cmd = {
"tsSubCmds": [
{
"entityType": "DEVICE",
"entityId": DEVICE_ID,
"scope": "LATEST_TELEMETRY",
"cmdId": 1
}
],
"attrSubCmds": [
{
"entityType": "DEVICE",
"entityId": DEVICE_ID,
"scope": "SERVER_SCOPE",
"cmdId": 2,
"keys": "active"
}
]
}
await websocket.send(json.dumps(subscribe_cmd))
print("已订阅设备更新,等待数据...")
while True:
message = await websocket.recv()
data = json.loads(message)
if 'subscriptionId' in data:
if data['subscriptionId'] == 1:
# 遥测更新
telemetry = data.get('data', {})
for key, values in telemetry.items():
if values:
value = values[0]
print(f"[{datetime.now()}] 遥测更新 - {key}: {value['value']} (时间: {value['ts']})")
elif data['subscriptionId'] == 2:
# 属性更新
attributes = data.get('data', {})
for attr in attributes:
print(f"[{datetime.now()}] 属性更新 - {attr['key']}: {attr['value']}")
if __name__ == "__main__":
# 查询当前状态
print("=== 查询当前遥测数据 ===")
telemetry = get_latest_telemetry()
if telemetry:
for key, values in telemetry.items():
if values:
print(f"{key}: {values[0]['value']} (时间: {values[0]['ts']})")
print("\n=== 查询设备状态 ===")
status = get_device_status()
if status:
for attr in status:
print(f"{attr['key']}: {attr['value']}")
print("\n=== 开始实时订阅 ===")
asyncio.run(subscribe_to_device())
```
**curl 脚本示例:**
```bash
#!/bin/bash
THINGSBOARD_HOST="localhost:8080"
JWT_TOKEN="YOUR_JWT_TOKEN"
DEVICE_ID="YOUR_DEVICE_ID"
echo "=== 获取最新遥测数据 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
echo -e "\n=== 获取设备状态 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/attributes/SERVER_SCOPE?keys=active" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
echo -e "\n=== 获取所有遥测键 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/keys/timeseries" \
-H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'
```
---
## 注意事项
1. **认证:** 所有 API 调用都需要有效的 JWT Token
2. **权限:** 确保用户有权限访问设备(`TENANT_ADMIN` 或 `CUSTOMER_USER`
3. **设备 ID** 使用设备的 UUID不是设备名称
4. **时间戳:** 所有时间戳都是 UTC 时间,单位是毫秒
5. **WebSocket** 订阅是实时的,适合需要即时更新的场景
6. **轮询 vs 订阅:**
- 轮询REST API适合不频繁查询的场景
- 订阅WebSocket适合需要实时更新的场景减少服务器负载
---
---
## 外部服务订阅方案(重要)
如果你的外部服务需要订阅设备发送给 ThingsBoard 的实时遥测数据,有以下几种方案:
### 方案 1通过规则引擎转发到外部 MQTT Broker推荐
**适用场景**:外部服务可以连接到外部 MQTT Broker 进行订阅
**实现步骤**
1. **在 ThingsBoard 规则引擎中添加 MQTT 节点**
- 进入 ThingsBoard Web UI
- 打开规则链Rule Chain编辑器
- 添加 "MQTT" 节点External → MQTT
- 配置外部 MQTT Broker 连接信息
2. **配置 MQTT 节点**
```json
{
"host": "external-mqtt-broker.example.com",
"port": 1883,
"topicPattern": "thingsboard/telemetry/${deviceName}",
"clientId": "tb-forwarder",
"cleanSession": true,
"retainedMessage": false,
"ssl": false,
"credentials": {
"type": "anonymous"
}
}
```
3. **连接规则链**
```
[设备遥测数据] → [MQTT 节点] → [外部 MQTT Broker]
[外部服务订阅]
```
4. **外部服务订阅**
```python
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
client.subscribe("thingsboard/telemetry/+")
def on_message(client, userdata, msg):
print(f"收到设备数据: {msg.topic} - {msg.payload}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("external-mqtt-broker.example.com", 1883, 60)
client.loop_forever()
```
**优势**
- ✅ 实时转发,延迟低
- ✅ 支持多个外部服务订阅同一主题
- ✅ 解耦 ThingsBoard 和外部服务
- ✅ 支持 QoS、认证等 MQTT 特性
### 方案 2通过规则引擎转发到 Kafka
**适用场景**:外部服务可以连接 Kafka 进行消费
**实现步骤**
1. **在规则引擎中添加 Kafka 节点**
- 添加 "Kafka" 节点External → Kafka
- 配置 Kafka Broker 信息
2. **配置 Kafka 节点**
```json
{
"bootstrapServers": "kafka-broker:9092",
"topicPattern": "thingsboard-telemetry",
"acks": "1",
"retries": 3,
"batchSize": 16384,
"linger": 5,
"bufferMemory": 33554432
}
```
3. **外部服务消费 Kafka**
```python
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'thingsboard-telemetry',
bootstrap_servers=['kafka-broker:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"收到设备数据: {message.value}")
```
### 方案 3通过规则引擎调用 HTTP 接口
**适用场景**:外部服务提供 HTTP Webhook 接口
**实现步骤**
1. **在规则引擎中添加 REST API Call 节点**
- 添加 "REST API Call" 节点External → REST API Call
- 配置外部服务的 HTTP 端点
2. **配置 REST API Call 节点**
```json
{
"restEndpointUrlPattern": "http://external-service:8080/api/telemetry",
"requestMethod": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY"
}
}
```
3. **外部服务接收数据**
```python
from flask import Flask, request
app = Flask(__name__)
@app.route('/api/telemetry', methods=['POST'])
def receive_telemetry():
data = request.json
print(f"收到设备数据: {data}")
# 处理数据...
return {"status": "ok"}, 200
app.run(host='0.0.0.0', port=8080)
```
### 方案 4直接订阅 ThingsBoard 内部 Kafka如果使用 Kafka 队列)
**适用场景**ThingsBoard 使用 Kafka 作为消息队列,且外部服务可以访问同一 Kafka 集群
**实现步骤**
1. **确认 ThingsBoard 使用的 Kafka 主题**
- ThingsBoard 内部使用以下主题:
- `tb.core.*` - 核心服务消息
- `tb.rule-engine.*` - 规则引擎消息
- `tb.transport.*` - 传输层消息
2. **外部服务订阅 Kafka**
```python
from kafka import KafkaConsumer
import json
# 注意:需要了解 ThingsBoard 内部消息格式Protobuf
consumer = KafkaConsumer(
'tb.core.telemetry', # 示例主题,需要根据实际情况调整
bootstrap_servers=['kafka:9092'],
group_id='external-service'
)
for message in consumer:
# ThingsBoard 使用 Protobuf 格式,需要解析
print(f"收到消息: {message.value}")
```
**注意**:此方案需要了解 ThingsBoard 内部消息格式,不推荐。
### 方案对比
| 方案 | 实时性 | 复杂度 | 推荐度 | 适用场景 |
|------|--------|--------|--------|----------|
| **外部 MQTT Broker** | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 最适合 IoT 场景 |
| **Kafka** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | 高吞吐量场景 |
| **HTTP Webhook** | ⭐⭐⭐ | ⭐ | ⭐⭐⭐ | 简单集成场景 |
| **直接订阅 Kafka** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | 不推荐 |
### 推荐配置示例:外部 MQTT Broker
**完整配置流程**
1. **部署外部 MQTT Broker**(如 Mosquitto
```bash
docker run -it -p 1883:1883 -p 9001:9001 \
-v mosquitto.conf:/mosquitto/config/mosquitto.conf \
eclipse-mosquitto
```
2. **在 ThingsBoard 规则链中配置**
- 规则链:`Root Rule Chain` 或设备特定的规则链
- 节点连接:`Message Type Switch` → `POST_TELEMETRY_REQUEST``MQTT Node`
3. **MQTT 节点配置**
```json
{
"host": "192.168.1.100",
"port": 1883,
"topicPattern": "devices/${deviceName}/telemetry",
"clientId": "tb-forwarder-${serviceId}",
"cleanSession": true,
"retainedMessage": false,
"ssl": false,
"credentials": {
"type": "anonymous"
}
}
```
4. **外部服务订阅**
```python
import paho.mqtt.client as mqtt
import json
def on_connect(client, userdata, flags, rc):
# 订阅所有设备的遥测数据
client.subscribe("devices/+/telemetry", qos=1)
print("已订阅设备遥测数据")
def on_message(client, userdata, msg):
topic_parts = msg.topic.split('/')
device_name = topic_parts[1]
telemetry = json.loads(msg.payload)
print(f"设备: {device_name}")
print(f"数据: {telemetry}")
# 处理遥测数据...
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("192.168.1.100", 1883, 60)
client.loop_forever()
```
---
## 相关文档
- [ThingsBoard REST API 文档](https://thingsboard.io/docs/reference/rest-api/)
- [ThingsBoard WebSocket API 文档](https://thingsboard.io/docs/user-guide/contribution/websocket-api/)
- [ThingsBoard MQTT API 文档](https://thingsboard.io/docs/reference/mqtt-api/)
- [ThingsBoard 规则引擎 MQTT 节点文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/mqtt/)
- [ThingsBoard 规则引擎 Kafka 节点文档](https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/kafka/)