thingsboard/summary/21-设备状态和遥测数据查询订阅指南.md

29 KiB
Raw Permalink Blame History

ThingsBoard 设备状态和遥测数据查询订阅指南

目录

  1. 查询设备遥测数据
  2. 查询设备属性
  3. 查询设备状态
  4. 订阅实时更新
  5. 使用示例

查询设备遥测数据

1. 获取最新的遥测值(推荐)

REST API 端点:

GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries

参数说明:

  • entityType: 实体类型,通常是 DEVICE
  • entityId: 设备 IDUUID
  • keys (可选): 遥测键名,多个用逗号分隔,如 temperature,humidity。不指定则返回所有键
  • useStrictDataTypes (可选): 是否使用严格数据类型,默认 false(值转为字符串)

示例:

# 获取所有最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

# 获取指定键的最新遥测值
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature,humidity" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

# 使用严格数据类型(返回原始类型)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&useStrictDataTypes=true" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

响应格式(默认,字符串类型):

{
  "temperature": [
    {
      "ts": 1705732800000,
      "value": "25.5"
    }
  ],
  "humidity": [
    {
      "ts": 1705732800000,
      "value": "60.2"
    }
  ]
}

响应格式useStrictDataTypes=true原始类型

{
  "temperature": [
    {
      "ts": 1705732800000,
      "value": 25.5
    }
  ],
  "humidity": [
    {
      "ts": 1705732800000,
      "value": 60.2
    }
  ]
}

2. 获取历史遥测数据

REST API 端点:

GET /api/plugins/telemetry/{entityType}/{entityId}/values/timeseries?keys={keys}&startTs={startTs}&endTs={endTs}

参数说明:

  • keys: 遥测键名,多个用逗号分隔
  • startTs: 开始时间戳毫秒UTC
  • endTs: 结束时间戳毫秒UTC
  • interval (可选): 聚合间隔(毫秒),默认 0不聚合
  • intervalType (可选): 聚合类型,如 MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS, WEEKS, MONTHS
  • agg (可选): 聚合函数,如 MIN, MAX, AVG, SUM, COUNT
  • limit (可选): 最大返回数据点数,默认 100
  • orderBy (可选): 排序方式,ASCDESC,默认 DESC

示例:

# 获取最近1小时的数据
START_TS=$(($(date +%s) * 1000 - 3600000))
END_TS=$(($(date +%s) * 1000))

curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&limit=100" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

# 获取聚合数据(每小时平均值)
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/timeseries?keys=temperature&startTs=$START_TS&endTs=$END_TS&interval=3600000&agg=AVG" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

响应格式:

[
  {
    "ts": 1705732800000,
    "value": "25.5"
  },
  {
    "ts": 1705736400000,
    "value": "26.0"
  }
]

3. 获取所有遥测键名

curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/keys/timeseries" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

响应格式:

["temperature", "humidity", "pressure", "batteryLevel"]

查询设备属性

1. 获取设备属性值

REST API 端点:

GET /api/plugins/telemetry/{entityType}/{entityId}/values/attributes

参数说明:

  • keys (可选): 属性键名,多个用逗号分隔

示例:

# 获取所有属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

# 获取指定属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes?keys=model,firmware" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

2. 按作用域获取属性

属性分为三种作用域:

  • SERVER_SCOPE: 服务器端属性ThingsBoard 管理)
  • SHARED_SCOPE: 共享属性(服务器和客户端共享)
  • CLIENT_SCOPE: 客户端属性(设备端管理)
# 获取服务器端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

# 获取客户端属性
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/CLIENT_SCOPE" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

查询设备状态

1. 设备在线状态

设备状态存储在 SERVER_SCOPE 属性中,键名为 active

# 查询设备是否在线
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

响应格式:

[
  {
    "key": "active",
    "value": true,
    "lastUpdateTs": 1705732800000
  }
]

2. 设备连接信息

设备状态还包括以下信息(存储在 SERVER_SCOPE 属性或遥测中):

  • lastConnectTime: 最后连接时间
  • lastDisconnectTime: 最后断开时间
  • lastActivityTime: 最后活动时间
# 查询设备连接信息
curl -X GET "http://localhost:8080/api/plugins/telemetry/DEVICE/{deviceId}/values/attributes/SERVER_SCOPE?keys=active,lastConnectTime,lastActivityTime" \
  -H "X-Authorization: Bearer $JWT_TOKEN"

订阅实时更新

1. MQTT 订阅(设备端)

ThingsBoard 支持通过 MQTT 协议订阅设备属性更新和 RPC 请求。注意: MQTT 主要用于设备端接收服务器推送的属性更新,遥测数据通常由设备发布给服务器。

可订阅的 MQTT 主题

v1 主题格式:

主题 说明 用途
v1/devices/me/attributes 属性更新 订阅服务器推送给设备的属性更新(共享属性和服务器属性)
v1/devices/me/attributes/response/+ 属性请求响应 订阅属性请求的响应(+ 是请求 ID
v1/devices/me/rpc/request/+ RPC 请求 订阅服务器发送的 RPC 请求(+ 是请求 ID
v1/devices/me/rpc/response/+ RPC 响应 订阅 RPC 响应的主题(用于客户端 RPC

v2 短主题格式(更高效):

主题 说明 对应 v1 主题
v2/a 属性更新(短格式) v1/devices/me/attributes
v2/a/res/+ 属性响应(短格式) v1/devices/me/attributes/response/+
v2/r/req/+ RPC 请求(短格式) v1/devices/me/rpc/request/+
v2/r/res/+ RPC 响应(短格式) v1/devices/me/rpc/response/+

Python MQTT 订阅示例

import paho.mqtt.client as mqtt
import json

# MQTT 连接配置
THINGSBOARD_HOST = "localhost"
THINGSBOARD_PORT = 1883
ACCESS_TOKEN = "YOUR_DEVICE_ACCESS_TOKEN"

def on_connect(client, userdata, flags, rc):
    """连接成功回调"""
    if rc == 0:
        print("Connected to ThingsBoard")
        
        # 订阅属性更新
        client.subscribe('v1/devices/me/attributes', qos=1)
        print("Subscribed to: v1/devices/me/attributes")
        
        # 订阅属性请求响应
        client.subscribe('v1/devices/me/attributes/response/+', qos=1)
        print("Subscribed to: v1/devices/me/attributes/response/+")
        
        # 订阅 RPC 请求
        client.subscribe('v1/devices/me/rpc/request/+', qos=1)
        print("Subscribed to: v1/devices/me/rpc/request/+")
        
        # 请求服务器端属性(可选)
        request_id = 1
        request = {"keys": "active,lastConnectTime"}
        client.publish(
            f'v1/devices/me/attributes/request/{request_id}',
            json.dumps(request),
            qos=1
        )
        print(f"Requested attributes: {request}")
    else:
        print(f"Failed to connect, return code {rc}")

def on_message(client, userdata, msg):
    """消息接收回调"""
    topic = msg.topic
    payload = msg.payload.decode('utf-8')
    
    print(f"\n收到消息 - 主题: {topic}")
    print(f"消息内容: {payload}")
    
    try:
        data = json.loads(payload)
        
        # 处理属性更新
        if topic == 'v1/devices/me/attributes':
            print("=== 属性更新 ===")
            for key, value in data.items():
                print(f"  {key}: {value}")
        
        # 处理属性请求响应
        elif topic.startswith('v1/devices/me/attributes/response/'):
            request_id = topic.split('/')[-1]
            print(f"=== 属性请求响应 (ID: {request_id}) ===")
            if 'client' in data:
                print("客户端属性:")
                for key, value in data['client'].items():
                    print(f"  {key}: {value}")
            if 'shared' in data:
                print("共享属性:")
                for key, value in data['shared'].items():
                    print(f"  {key}: {value}")
            if 'server' in data:
                print("服务器属性:")
                for key, value in data['server'].items():
                    print(f"  {key}: {value}")
        
        # 处理 RPC 请求
        elif topic.startswith('v1/devices/me/rpc/request/'):
            request_id = topic.split('/')[-1]
            print(f"=== RPC 请求 (ID: {request_id}) ===")
            method = data.get('method', '')
            params = data.get('params', {})
            print(f"方法: {method}")
            print(f"参数: {params}")
            
            # 处理 RPC 请求并回复
            if method == 'getCurrentTime':
                response = {
                    "time": "2025-01-20 10:30:00"
                }
            elif method == 'setGpio':
                # 处理 GPIO 设置
                pin = params.get('pin')
                value = params.get('value')
                print(f"设置 GPIO Pin {pin} = {value}")
                response = {"success": True, "pin": pin, "value": value}
            else:
                response = {"error": f"Unknown method: {method}"}
            
            # 发送 RPC 响应
            response_topic = f'v1/devices/me/rpc/response/{request_id}'
            client.publish(response_topic, json.dumps(response), qos=1)
            print(f"已发送 RPC 响应到: {response_topic}")
    
    except json.JSONDecodeError:
        print(f"无法解析 JSON: {payload}")

def on_disconnect(client, userdata, rc):
    """断开连接回调"""
    print("Disconnected from ThingsBoard")

# 创建 MQTT 客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# 设置认证(使用 Access Token 作为用户名)
client.username_pw_set(ACCESS_TOKEN)

# 连接到 ThingsBoard
try:
    client.connect(THINGSBOARD_HOST, THINGSBOARD_PORT, 60)
    client.loop_forever()
except Exception as e:
    print(f"连接失败: {e}")

请求设备属性(通过 MQTT

设备可以通过发布请求消息来获取属性值:

# 请求客户端属性
request_id = 1
request = {"clientKeys": "model,version"}
client.publish(
    f'v1/devices/me/attributes/request/{request_id}',
    json.dumps(request),
    qos=1
)

# 请求共享属性
request_id = 2
request = {"sharedKeys": "config"}
client.publish(
    f'v1/devices/me/attributes/request/{request_id}',
    json.dumps(request),
    qos=1
)

# 请求所有属性
request_id = 3
request = {"clientKeys": "model", "sharedKeys": "config", "serverKeys": "active"}
client.publish(
    f'v1/devices/me/attributes/request/{request_id}',
    json.dumps(request),
    qos=1
)

响应会发布到 v1/devices/me/attributes/response/{request_id} 主题。

使用 v2 短主题(更高效)

# 使用 v2 短主题
client.subscribe('v2/a', qos=1)  # 属性更新
client.subscribe('v2/a/res/+', qos=1)  # 属性响应
client.subscribe('v2/r/req/+', qos=1)  # RPC 请求

# 请求属性v2 格式)
request_id = 1
request = {"clientKeys": "model"}
client.publish(f'v2/a/req/{request_id}', json.dumps(request), qos=1)

注意事项

  1. 遥测数据订阅MQTT 协议中,遥测数据通常由设备发布给服务器,而不是服务器推送给设备。如果需要实时获取遥测数据,建议:

    • 使用 WebSocket 订阅(见下方)
    • 或通过 RPC 请求设备主动发送遥测数据
  2. 设备状态:设备状态(active)存储在服务器属性中,可以通过属性请求获取:

    request = {"serverKeys": "active,lastConnectTime,lastActivityTime"}
    client.publish(f'v1/devices/me/attributes/request/1', json.dumps(request), qos=1)
    
  3. QoS 级别:推荐使用 QoS 1至少一次确保消息不丢失

  4. 连接认证:使用设备的 Access Token 作为 MQTT 用户名,密码留空


2. WebSocket 订阅(应用端,推荐)

ThingsBoard 支持通过 WebSocket 实时订阅设备遥测和属性的更新。适用于应用端(如 Web 应用、后端服务)查询和订阅设备数据。

WebSocket 连接:

ws://localhost:8080/api/ws/plugins/telemetry

认证方式:

  • 使用 JWT Token 作为查询参数:?token=$JWT_TOKEN
  • 或使用 Bearer Token 在连接头中

订阅命令格式:

{
  "tsSubCmds": [
    {
      "entityType": "DEVICE",
      "entityId": "{deviceId}",
      "scope": "LATEST_TELEMETRY",
      "cmdId": 1
    }
  ],
  "attrSubCmds": [
    {
      "entityType": "DEVICE",
      "entityId": "{deviceId}",
      "scope": "SERVER_SCOPE",
      "cmdId": 2,
      "keys": "active"
    }
  ]
}

JavaScript 示例:

// 连接 WebSocket
const token = 'YOUR_JWT_TOKEN';
const ws = new WebSocket(`ws://localhost:8080/api/ws/plugins/telemetry?token=${token}`);

ws.onopen = function() {
  console.log('WebSocket connected');
  
  // 订阅遥测数据
  const subscribeCmd = {
    tsSubCmds: [
      {
        entityType: 'DEVICE',
        entityId: 'YOUR_DEVICE_ID',
        scope: 'LATEST_TELEMETRY',
        cmdId: 1
      }
    ],
    attrSubCmds: [
      {
        entityType: 'DEVICE',
        entityId: 'YOUR_DEVICE_ID',
        scope: 'SERVER_SCOPE',
        cmdId: 2,
        keys: 'active'
      }
    ]
  };
  
  ws.send(JSON.stringify(subscribeCmd));
};

ws.onmessage = function(event) {
  const data = JSON.parse(event.data);
  console.log('Received update:', data);
  
  // 处理遥测更新
  if (data.subscriptionId === 1) {
    console.log('Telemetry update:', data.data);
  }
  
  // 处理属性更新
  if (data.subscriptionId === 2) {
    console.log('Attribute update:', data.data);
  }
};

ws.onerror = function(error) {
  console.error('WebSocket error:', error);
};

ws.onclose = function() {
  console.log('WebSocket closed');
};

Python 示例:

import asyncio
import websockets
import json

async def subscribe_to_device():
    token = 'YOUR_JWT_TOKEN'
    device_id = 'YOUR_DEVICE_ID'
    
    uri = f'ws://localhost:8080/api/ws/plugins/telemetry?token={token}'
    
    async with websockets.connect(uri) as websocket:
        # 订阅命令
        subscribe_cmd = {
            "tsSubCmds": [
                {
                    "entityType": "DEVICE",
                    "entityId": device_id,
                    "scope": "LATEST_TELEMETRY",
                    "cmdId": 1
                }
            ],
            "attrSubCmds": [
                {
                    "entityType": "DEVICE",
                    "entityId": device_id,
                    "scope": "SERVER_SCOPE",
                    "cmdId": 2,
                    "keys": "active"
                }
            ]
        }
        
        await websocket.send(json.dumps(subscribe_cmd))
        print("Subscribed to device updates")
        
        # 接收更新
        while True:
            message = await websocket.recv()
            data = json.loads(message)
            
            if 'subscriptionId' in data:
                if data['subscriptionId'] == 1:
                    print(f"Telemetry update: {data.get('data', {})}")
                elif data['subscriptionId'] == 2:
                    print(f"Attribute update: {data.get('data', {})}")

# 运行
asyncio.run(subscribe_to_device())

2. 订阅特定键的遥测

{
  "tsSubCmds": [
    {
      "entityType": "DEVICE",
      "entityId": "{deviceId}",
      "scope": "LATEST_TELEMETRY",
      "cmdId": 1,
      "keys": "temperature,humidity"
    }
  ]
}

3. 订阅多个设备

{
  "tsSubCmds": [
    {
      "entityType": "DEVICE",
      "entityId": "{deviceId1}",
      "scope": "LATEST_TELEMETRY",
      "cmdId": 1
    },
    {
      "entityType": "DEVICE",
      "entityId": "{deviceId2}",
      "scope": "LATEST_TELEMETRY",
      "cmdId": 2
    }
  ]
}

使用示例

完整示例:查询和监控设备

Python 脚本示例:

import requests
import websockets
import json
import asyncio
from datetime import datetime

# 配置
THINGSBOARD_HOST = "localhost:8080"
JWT_TOKEN = "YOUR_JWT_TOKEN"
DEVICE_ID = "YOUR_DEVICE_ID"

def get_latest_telemetry():
    """获取最新遥测数据"""
    url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/timeseries"
    headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        return None

def get_device_status():
    """获取设备状态"""
    url = f"http://{THINGSBOARD_HOST}/api/plugins/telemetry/DEVICE/{DEVICE_ID}/values/attributes/SERVER_SCOPE"
    params = {"keys": "active,lastConnectTime,lastActivityTime"}
    headers = {"X-Authorization": f"Bearer {JWT_TOKEN}"}
    
    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        return None

async def subscribe_to_device():
    """订阅设备实时更新"""
    uri = f"ws://{THINGSBOARD_HOST}/api/ws/plugins/telemetry?token={JWT_TOKEN}"
    
    async with websockets.connect(uri) as websocket:
        # 订阅遥测和属性
        subscribe_cmd = {
            "tsSubCmds": [
                {
                    "entityType": "DEVICE",
                    "entityId": DEVICE_ID,
                    "scope": "LATEST_TELEMETRY",
                    "cmdId": 1
                }
            ],
            "attrSubCmds": [
                {
                    "entityType": "DEVICE",
                    "entityId": DEVICE_ID,
                    "scope": "SERVER_SCOPE",
                    "cmdId": 2,
                    "keys": "active"
                }
            ]
        }
        
        await websocket.send(json.dumps(subscribe_cmd))
        print("已订阅设备更新,等待数据...")
        
        while True:
            message = await websocket.recv()
            data = json.loads(message)
            
            if 'subscriptionId' in data:
                if data['subscriptionId'] == 1:
                    # 遥测更新
                    telemetry = data.get('data', {})
                    for key, values in telemetry.items():
                        if values:
                            value = values[0]
                            print(f"[{datetime.now()}] 遥测更新 - {key}: {value['value']} (时间: {value['ts']})")
                elif data['subscriptionId'] == 2:
                    # 属性更新
                    attributes = data.get('data', {})
                    for attr in attributes:
                        print(f"[{datetime.now()}] 属性更新 - {attr['key']}: {attr['value']}")

if __name__ == "__main__":
    # 查询当前状态
    print("=== 查询当前遥测数据 ===")
    telemetry = get_latest_telemetry()
    if telemetry:
        for key, values in telemetry.items():
            if values:
                print(f"{key}: {values[0]['value']} (时间: {values[0]['ts']})")
    
    print("\n=== 查询设备状态 ===")
    status = get_device_status()
    if status:
        for attr in status:
            print(f"{attr['key']}: {attr['value']}")
    
    print("\n=== 开始实时订阅 ===")
    asyncio.run(subscribe_to_device())

curl 脚本示例:

#!/bin/bash

THINGSBOARD_HOST="localhost:8080"
JWT_TOKEN="YOUR_JWT_TOKEN"
DEVICE_ID="YOUR_DEVICE_ID"

echo "=== 获取最新遥测数据 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/timeseries" \
  -H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'

echo -e "\n=== 获取设备状态 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/values/attributes/SERVER_SCOPE?keys=active" \
  -H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'

echo -e "\n=== 获取所有遥测键 ==="
curl -s -X GET "http://$THINGSBOARD_HOST/api/plugins/telemetry/DEVICE/$DEVICE_ID/keys/timeseries" \
  -H "X-Authorization: Bearer $JWT_TOKEN" | jq '.'

注意事项

  1. 认证: 所有 API 调用都需要有效的 JWT Token
  2. 权限: 确保用户有权限访问设备(TENANT_ADMINCUSTOMER_USER
  3. 设备 ID 使用设备的 UUID不是设备名称
  4. 时间戳: 所有时间戳都是 UTC 时间,单位是毫秒
  5. WebSocket 订阅是实时的,适合需要即时更新的场景
  6. 轮询 vs 订阅:
    • 轮询REST API适合不频繁查询的场景
    • 订阅WebSocket适合需要实时更新的场景减少服务器负载


外部服务订阅方案(重要)

如果你的外部服务需要订阅设备发送给 ThingsBoard 的实时遥测数据,有以下几种方案:

方案 1通过规则引擎转发到外部 MQTT Broker推荐

适用场景:外部服务可以连接到外部 MQTT Broker 进行订阅

实现步骤

  1. 在 ThingsBoard 规则引擎中添加 MQTT 节点

    • 进入 ThingsBoard Web UI
    • 打开规则链Rule Chain编辑器
    • 添加 "MQTT" 节点External → MQTT
    • 配置外部 MQTT Broker 连接信息
  2. 配置 MQTT 节点

    {
      "host": "external-mqtt-broker.example.com",
      "port": 1883,
      "topicPattern": "thingsboard/telemetry/${deviceName}",
      "clientId": "tb-forwarder",
      "cleanSession": true,
      "retainedMessage": false,
      "ssl": false,
      "credentials": {
        "type": "anonymous"
      }
    }
    
  3. 连接规则链

    [设备遥测数据] → [MQTT 节点] → [外部 MQTT Broker]
                               ↓
                      [外部服务订阅]
    
  4. 外部服务订阅

    import paho.mqtt.client as mqtt
    
    def on_connect(client, userdata, flags, rc):
        client.subscribe("thingsboard/telemetry/+")
    
    def on_message(client, userdata, msg):
        print(f"收到设备数据: {msg.topic} - {msg.payload}")
    
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("external-mqtt-broker.example.com", 1883, 60)
    client.loop_forever()
    

优势

  • 实时转发,延迟低
  • 支持多个外部服务订阅同一主题
  • 解耦 ThingsBoard 和外部服务
  • 支持 QoS、认证等 MQTT 特性

方案 2通过规则引擎转发到 Kafka

适用场景:外部服务可以连接 Kafka 进行消费

实现步骤

  1. 在规则引擎中添加 Kafka 节点

    • 添加 "Kafka" 节点External → Kafka
    • 配置 Kafka Broker 信息
  2. 配置 Kafka 节点

    {
      "bootstrapServers": "kafka-broker:9092",
      "topicPattern": "thingsboard-telemetry",
      "acks": "1",
      "retries": 3,
      "batchSize": 16384,
      "linger": 5,
      "bufferMemory": 33554432
    }
    
  3. 外部服务消费 Kafka

    from kafka import KafkaConsumer
    import json
    
    consumer = KafkaConsumer(
        'thingsboard-telemetry',
        bootstrap_servers=['kafka-broker:9092'],
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
    
    for message in consumer:
        print(f"收到设备数据: {message.value}")
    

方案 3通过规则引擎调用 HTTP 接口

适用场景:外部服务提供 HTTP Webhook 接口

实现步骤

  1. 在规则引擎中添加 REST API Call 节点

    • 添加 "REST API Call" 节点External → REST API Call
    • 配置外部服务的 HTTP 端点
  2. 配置 REST API Call 节点

    {
      "restEndpointUrlPattern": "http://external-service:8080/api/telemetry",
      "requestMethod": "POST",
      "headers": {
        "Content-Type": "application/json",
        "Authorization": "Bearer YOUR_API_KEY"
      }
    }
    
  3. 外部服务接收数据

    from flask import Flask, request
    
    app = Flask(__name__)
    
    @app.route('/api/telemetry', methods=['POST'])
    def receive_telemetry():
        data = request.json
        print(f"收到设备数据: {data}")
        # 处理数据...
        return {"status": "ok"}, 200
    
    app.run(host='0.0.0.0', port=8080)
    

方案 4直接订阅 ThingsBoard 内部 Kafka如果使用 Kafka 队列)

适用场景ThingsBoard 使用 Kafka 作为消息队列,且外部服务可以访问同一 Kafka 集群

实现步骤

  1. 确认 ThingsBoard 使用的 Kafka 主题

    • ThingsBoard 内部使用以下主题:
      • tb.core.* - 核心服务消息
      • tb.rule-engine.* - 规则引擎消息
      • tb.transport.* - 传输层消息
  2. 外部服务订阅 Kafka

    from kafka import KafkaConsumer
    import json
    
    # 注意:需要了解 ThingsBoard 内部消息格式Protobuf
    consumer = KafkaConsumer(
        'tb.core.telemetry',  # 示例主题,需要根据实际情况调整
        bootstrap_servers=['kafka:9092'],
        group_id='external-service'
    )
    
    for message in consumer:
        # ThingsBoard 使用 Protobuf 格式,需要解析
        print(f"收到消息: {message.value}")
    

注意:此方案需要了解 ThingsBoard 内部消息格式,不推荐。

方案对比

方案 实时性 复杂度 推荐度 适用场景
外部 MQTT Broker 最适合 IoT 场景
Kafka 高吞吐量场景
HTTP Webhook 简单集成场景
直接订阅 Kafka 不推荐

推荐配置示例:外部 MQTT Broker

完整配置流程

  1. 部署外部 MQTT Broker(如 Mosquitto

    docker run -it -p 1883:1883 -p 9001:9001 \
      -v mosquitto.conf:/mosquitto/config/mosquitto.conf \
      eclipse-mosquitto
    
  2. 在 ThingsBoard 规则链中配置

    • 规则链:Root Rule Chain 或设备特定的规则链
    • 节点连接:Message Type SwitchPOST_TELEMETRY_REQUESTMQTT Node
  3. MQTT 节点配置

    {
      "host": "192.168.1.100",
      "port": 1883,
      "topicPattern": "devices/${deviceName}/telemetry",
      "clientId": "tb-forwarder-${serviceId}",
      "cleanSession": true,
      "retainedMessage": false,
      "ssl": false,
      "credentials": {
        "type": "anonymous"
      }
    }
    
  4. 外部服务订阅

    import paho.mqtt.client as mqtt
    import json
    
    def on_connect(client, userdata, flags, rc):
        # 订阅所有设备的遥测数据
        client.subscribe("devices/+/telemetry", qos=1)
        print("已订阅设备遥测数据")
    
    def on_message(client, userdata, msg):
        topic_parts = msg.topic.split('/')
        device_name = topic_parts[1]
        telemetry = json.loads(msg.payload)
    
        print(f"设备: {device_name}")
        print(f"数据: {telemetry}")
    
        # 处理遥测数据...
    
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("192.168.1.100", 1883, 60)
    client.loop_forever()
    

相关文档