修改拓恒数据结构

This commit is contained in:
孙小云 2026-02-10 12:26:52 +08:00
parent 820d087861
commit f0ebbf5732
13 changed files with 763 additions and 343 deletions

View File

@ -2,19 +2,29 @@
## 概述
本模块提供对大疆设备MQTT消息的订阅和处理功能支持监听机场和无人机的各类Topic消息。
本模块提供对拓恒设备MQTT消息的订阅和处理功能支持监听机场和无人机的各类Topic消息。
## 设备分类
| SN 前缀 | 设备类型 | Topic 格式 |
|---------|---------|-----------|
| `TH` 开头 | 拓恒设备 | `thing/product/{SN}/...` |
| `THJS` 开头 | 大疆设备 | `thing/product/{SN}/...` |
| `7C` 开头 | 大疆设备 | `thing/product/{SN}/...` |
| `158` 开头 | 大疆设备 | `thing/product/{SN}/...` |
**注意:本模块只处理 `TH` 开头的拓恒设备,其他前缀的大疆设备会被过滤。**
## 包结构
```
com.ruoyi.device.domain.impl.tuohengmqtt
├── callback/ # 数据回调接口
│ ├── IAirportOsdCallback.java # 机场OSD数据回调
│ ├── IDroneOsdCallback.java # 无人机OSD数据回调
│ ├── IAirportStateCallback.java # 机场State数据回调
│ ├── IEventsCallback.java # Events数据回调
│ ├── IServicesReplyCallback.java # 服务回复回调
│ └── IRequestsCallback.java # 设备请求回调
│ ├── ITuohengRealTimeDataCallback.java # 机场实时数据回调
│ ├── IDroneRealTimeCallback.java # 无人机实时数据回调
│ ├── ITuohengOsdCallback.java # OSD数据回调
│ ├── ITuohengEventsCallback.java # Events数据回调
│ └── IRealTimeBasicCallback.java # 机场基础数据回调
├── config/ # 配置类
│ └── TuohengMqttClientConfig.java
├── handler/ # 消息处理器
@ -22,81 +32,228 @@ com.ruoyi.device.domain.impl.tuohengmqtt
├── manager/ # 客户端管理器
│ └── TuohengMqttClientManager.java
├── model/ # 数据模型
│ ├── TuohengMqttMessage.java # MQTT消息基础结构
│ ├── AirportOsdData.java # 机场OSD数据
│ ├── DroneOsdData.java # 无人机OSD数据
│ ├── AirportStateData.java # 机场State数据
│ ├── TuohengRealTimeData.java # 机场实时数据(天气、电池、传感器、充电器)
│ ├── RealTimeBasicData.java # 机场基础数据版本、MAC、状态
│ ├── DroneRealTimeData.java # 无人机实时数据OSD
│ ├── AirportOsdData.java # OSD数据
│ └── EventsData.java # Events数据
├── service/ # MQTT客户端服务
│ └── TuohengMqttClientService.java
└── example/ # 使用示例
└── TuohengMqttUsageExample.java
└── service/ # MQTT客户端服务
└── TuohengMqttClientService.java
```
## 支持的Topic
### 拓恒设备 Topic
| Topic | 说明 | 示例 |
|-------|------|------|
| `thing/product/{AirportSn}/osd` | 机场OSD数据 | thing/product/7CTDM3D00BVY4C/osd |
| `thing/product/{DroneSn}/osd` | 无人机OSD数据 | thing/product/1581F6Q8D243100C605L/osd |
| `thing/product/{AirportSn}/state` | 机场State数据 | thing/product/7CTDM3D00BVY4C/state |
| `thing/product/{AirportSn}/events` | 机场Events数据 | thing/product/7CTDM3D00BVY4C/events |
| `thing/product/{AirportSn}/services_reply` | 服务回复 | thing/product/7CTDM3D00BVY4C/services_reply |
| `sys/product/{AirportSn}/status` | 设备状态 | sys/product/7CTDM3D00BVY4C/status |
| `thing/product/{AirportSn}/requests` | 设备请求 | thing/product/7CTDM3D00BVY4C/requests |
| `/topic/v1/airportNest/{SN}/realTime/data` | 机场实时数据 | `/topic/v1/airportNest/THJSQ03B2309DN7VQN43/realTime/data` |
| `/topic/v1/airportNest/{SN}/realTime/basic` | 机场实时基础数据 | `/topic/v1/airportNest/THJSQ03B2309DN7VQN43/realTime/basic` |
| `/topic/v1/airportDrone/{SN}/realTime/data` | 无人机实时数据 | `/topic/v1/airportDrone/THJSQ03B2309DN7VQN43/realTime/data` |
| `thing/product/{SN}/osd` | OSD数据 | `thing/product/THXXX/.../osd` |
| `thing/product/{SN}/events` | Events数据 | `thing/product/THXXX/.../events` |
| `thing/product/{SN}/services` | Services数据 | `thing/product/THXXX/.../services` |
| `thing/product/{SN}/services_reply` | Services Reply | `thing/product/THXXX/.../services_reply` |
### 数据模型说明
#### TuohengRealTimeData - 机场实时数据
```json
{
"nestWeather": {
"msg": "Op succeed",
"code": 0,
"data": {
"rainfall": "0.00",
"windLevel": "1.00",
"windDir": "6.00",
"windSpeed": "0.46",
"rainFlag": 0,
"windAngle": "305.00"
},
"sender": "DroneNest",
"fun": "query_weather"
},
"droneBattery": {
"msg": "success",
"code": 0,
"data": {
"cell_1Voltage": "4.08",
"Battery_level": 88,
"totalVoltage": "24.47",
"bCharging": 0,
"Battery_health": 98
},
"sender": "DroneNest",
"fun": "query_battery"
},
"nestInnerSensor": {
"msg": "success",
"code": 0,
"data": {
"innerHum": "44.3",
"innerTemp": "15.4"
},
"sender": "DroneNest",
"fun": "query_sensor"
},
"nestCharger": {
"msg": "success",
"code": 0,
"data": {
"startCharge": "2026-02-03 13:19:12",
"current": "0.00",
"bCharging": 2,
"capacity": "32.00"
},
"sender": "DroneNest",
"fun": "query_charger"
}
}
```
#### RealTimeBasicData - 机场基础数据
```json
{
"msg": "Op succeed!",
"code": 0,
"data": {
"airportID": "THJSQ03B2309S1R2R3Z9",
"version": {
"software": "2.5.4.10",
"hardware": "2.5.4.10"
},
"mac": "48:57:67:0f:50:1b",
"status": "online"
},
"sender": "DroneNest",
"send_timestamp": 1770171892818,
"fun": "query_basic"
}
```
#### DroneRealTimeData - 无人机实时数据
```json
{
"msg": "操作成功",
"code": 0,
"data": {
"deviceid": "THJSQ03B2309S99DWDL4",
"battery_remain": "84",
"voltage": "24.37",
"mode": "stabilize",
"armed": "false",
"alt": "4.78",
"lat": 0,
"lon": 0,
"yaw": 356.5583,
"pitch": -0.1593773,
"roll": -0.7585182,
"hspeed": "0.01",
"vspeed": "-0.14",
"flytime": 0,
"mileage": "0"
}
}
```
## 使用方法
### 1. 创建MQTT客户端配置
### 1. 配置文件
`application.yml` 中配置:
```yaml
tuoheng:
mqtt:
host: mqtt.t-aaron.com
port: 10883
client-id: ThingsBoard_gateway
username: admin
password: admin
connection-timeout: 30
keep-alive-interval: 60
auto-reconnect: true
clean-session: false
```
### 2. 服务自动初始化
`TuohengService` 在应用启动时自动初始化MQTT客户端并注册回调
```java
TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
.host("your-mqtt-host")
.port(1883)
.clientId("tuoheng-client-1")
.username("your-username")
.password("your-password")
@Service
@Slf4j
public class TuohengService {
@Autowired
private TuohengMqttClientManager clientManager;
@Autowired
private TuohengMqttProperties mqttProperties;
@EventListener(ApplicationReadyEvent.class)
public void onApplicationReady() {
TuohengMqttClientConfig config = TuohengMqttClientConfig.builder()
.host(mqttProperties.getHost())
.port(mqttProperties.getPort())
.clientId(mqttProperties.getClientId())
.username(mqttProperties.getUsername())
.password(mqttProperties.getPassword())
.connectionTimeout(mqttProperties.getConnectionTimeout())
.keepAliveInterval(mqttProperties.getKeepAliveInterval())
.autoReconnect(mqttProperties.getAutoReconnect())
.cleanSession(mqttProperties.getCleanSession())
.useSharedSubscription(true)
.sharedGroupName("tuoheng-group")
.build();
```
### 2. 创建客户端
clientManager.initClient(config);
```java
@Autowired
private TuohengMqttClientManager manager;
TuohengMqttMessageHandler handler = clientManager.getHandler();
String clientId = manager.createClient(config);
```
### 3. 注册回调
```java
var handler = manager.getHandler(clientId);
// 设置机场与无人机的SN映射
Map<String, String> mapping = new HashMap<>();
mapping.put("7CTDM3D00BVY4C", "1581F6Q8D243100C605L");
handler.setAirportDroneMapping(mapping);
// 注册各种回调
handler.registerAirportOsdCallback(new IAirportOsdCallback() {
// 注册回调
handler.registerRealTimeDataCallback(new ITuohengRealTimeDataCallback() {
@Override
public void onAirportOsdData(String airportSn, AirportOsdData data) {
// 处理机场OSD数据
public void onRealTimeData(String deviceSn, TuohengRealTimeData data) {
log.info("收到机场实时数据: {}", deviceSn);
}
});
});
handler.registerRealTimeBasicCallback(new IRealTimeBasicCallback() {
@Override
public void onRealTimeBasicData(String deviceSn, RealTimeBasicData data) {
log.info("收到机场基础数据: {}", deviceSn);
}
});
}
}
```
### 4. 发送消息
### 3. 设备过滤
```java
var client = manager.getClient(clientId);
client.publish("thing/product/7CTDM3D00BVY4C/services", message);
```
本模块会自动过滤非拓恒设备:
| Topic 类型 | 过滤规则 |
|-----------|---------|
| `thing/product/{SN}` | 只处理 `TH` 开头的SN跳过 `7C`/`158`/`THJS` 开头的大疆设备 |
| `/topic/v1/...` | 全部是拓恒设备,直接处理 |
### 4. 消息路由
| Topic 类型 | 回调接口 |
|-----------|---------|
| `/topic/v1/airportNest/+/realTime/data` | `ITuohengRealTimeDataCallback` |
| `/topic/v1/airportNest/+/realTime/basic` | `IRealTimeBasicCallback` |
| `/topic/v1/airportDrone/+/realTime/data` | `IDroneRealTimeCallback` |
| `thing/product/+/osd` | `ITuohengOsdCallback` |
| `thing/product/+/events` | `ITuohengEventsCallback` |
## 注意事项
1. 机场SN以 `7C` 开头无人机SN以 `158` 开头
2. 需要正确配置机场与无人机的SN映射关系
3. 使用MQTT 5.0协议 (Eclipse Paho MQTT v5)
4. 支持共享订阅,可通过配置开启
1. 本模块只处理拓恒设备SN 以 `TH` 开头)
2. 使用共享订阅,适合集群环境
3. 支持自动重连
4. 数据库中会过滤大疆设备THJS开头
5. 日志级别设置为 DEBUG 可以看到详细的MQTT消息处理过程
6. 无人机实时数据airportDrone目前只在日志中输出可以通过添加回调接口来处理

View File

@ -0,0 +1,7 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneRealTimeData;
public interface IDroneRealTimeCallback {
void onDroneRealTimeData(String deviceSn, DroneRealTimeData data);
}

View File

@ -0,0 +1,8 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.RealTimeBasicData;
public interface IRealTimeBasicCallback {
void onRealTimeBasicData(String deviceSn, RealTimeBasicData data);
}

View File

@ -0,0 +1,8 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
public interface ITuohengEventsCallback {
void onEventsData(String deviceSn, EventsData data);
}

View File

@ -0,0 +1,8 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
public interface ITuohengOsdCallback {
void onOsdData(String deviceSn, AirportOsdData data);
}

View File

@ -0,0 +1,8 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.callback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData;
public interface ITuohengRealTimeDataCallback {
void onRealTimeData(String deviceSn, TuohengRealTimeData data);
}

View File

@ -1,23 +1,21 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneRealTimeCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRealTimeBasicCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengRealTimeDataCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneRealTimeData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengMqttMessage;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.RealTimeBasicData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@Slf4j
@ -26,115 +24,89 @@ public class TuohengMqttMessageHandler {
private final ObjectMapper objectMapper = new ObjectMapper();
private final List<IAirportOsdCallback> airportOsdCallbacks = new ArrayList<>();
private final List<ITuohengRealTimeDataCallback> realTimeDataCallbacks = new ArrayList<>();
private final List<ITuohengOsdCallback> osdCallbacks = new ArrayList<>();
private final List<ITuohengEventsCallback> eventsCallbacks = new ArrayList<>();
private final List<IRealTimeBasicCallback> realTimeBasicCallbacks = new ArrayList<>();
private final List<IDroneRealTimeCallback> droneRealTimeCallbacks = new ArrayList<>();
private final List<IDroneOsdCallback> droneOsdCallbacks = new ArrayList<>();
private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+");
private final List<IAirportStateCallback> airportStateCallbacks = new ArrayList<>();
private final List<IEventsCallback> eventsCallbacks = new ArrayList<>();
private final List<IServicesReplyCallback> servicesReplyCallbacks = new ArrayList<>();
private final List<IRequestsCallback> requestsCallbacks = new ArrayList<>();
private static final Pattern DRONE_SN_PATTERN = Pattern.compile("^158[0-9A-Z]+$");
private static final Pattern AIRPORT_SN_PATTERN = Pattern.compile("^7C[0-9A-Z]+$");
private Map<String, String> airportDroneMapping;
public void setAirportDroneMapping(Map<String, String> mapping) {
this.airportDroneMapping = mapping;
}
public void registerAirportOsdCallback(IAirportOsdCallback callback) {
if (callback != null && !airportOsdCallbacks.contains(callback)) {
airportOsdCallbacks.add(callback);
log.info("注册机场OSD数据回调: {}", callback.getClass().getSimpleName());
public void registerRealTimeDataCallback(ITuohengRealTimeDataCallback callback) {
if (callback != null && !realTimeDataCallbacks.contains(callback)) {
realTimeDataCallbacks.add(callback);
log.info("注册实时数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerDroneOsdCallback(IDroneOsdCallback callback) {
if (callback != null && !droneOsdCallbacks.contains(callback)) {
droneOsdCallbacks.add(callback);
log.info("注册无人机OSD数据回调: {}", callback.getClass().getSimpleName());
public void registerOsdCallback(ITuohengOsdCallback callback) {
if (callback != null && !osdCallbacks.contains(callback)) {
osdCallbacks.add(callback);
log.info("注册OSD数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerAirportStateCallback(IAirportStateCallback callback) {
if (callback != null && !airportStateCallbacks.contains(callback)) {
airportStateCallbacks.add(callback);
log.info("注册机场State数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerEventsCallback(IEventsCallback callback) {
public void registerEventsCallback(ITuohengEventsCallback callback) {
if (callback != null && !eventsCallbacks.contains(callback)) {
eventsCallbacks.add(callback);
log.info("注册Events数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerServicesReplyCallback(IServicesReplyCallback callback) {
if (callback != null && !servicesReplyCallbacks.contains(callback)) {
servicesReplyCallbacks.add(callback);
log.info("注册ServicesReply数据回调: {}", callback.getClass().getSimpleName());
public void registerRealTimeBasicCallback(IRealTimeBasicCallback callback) {
if (callback != null && !realTimeBasicCallbacks.contains(callback)) {
realTimeBasicCallbacks.add(callback);
log.info("注册实时基础数据回调: {}", callback.getClass().getSimpleName());
}
}
public void registerRequestsCallback(IRequestsCallback callback) {
if (callback != null && !requestsCallbacks.contains(callback)) {
requestsCallbacks.add(callback);
log.info("注册Requests数据回调: {}", callback.getClass().getSimpleName());
public void registerDroneRealTimeCallback(IDroneRealTimeCallback callback) {
if (callback != null && !droneRealTimeCallbacks.contains(callback)) {
droneRealTimeCallbacks.add(callback);
log.info("注册无人机实时数据回调: {}", callback.getClass().getSimpleName());
}
}
public void handleMessage(String topic, String payload) {
try {
log.debug("收到MQTT消息 - Topic: {}, Payload: {}", topic, payload);
log.debug("收到MQTT消息 - Topic: {}", topic);
String deviceSn = extractDeviceSnFromTopic(topic);
String messageType = extractMessageTypeFromTopic(topic);
if (deviceSn == null || messageType == null) {
log.warn("无法从Topic解析设备SN或消息类型: {}", topic);
if (deviceSn == null) {
log.warn("无法从Topic解析设备SN: {}", topic);
return;
}
String droneSn = null;
String airportSn = null;
if (isAirportSn(deviceSn)) {
airportSn = deviceSn;
if (airportDroneMapping != null) {
droneSn = airportDroneMapping.get(deviceSn);
if (isProductTopic(topic)) {
if (!isTuohengSn(deviceSn)) {
log.debug("跳过大疆设备 - SN: {}", deviceSn);
return;
}
} else if (isDroneSn(deviceSn)) {
droneSn = deviceSn;
}
@SuppressWarnings("unchecked")
TuohengMqttMessage<Map<String, Object>> message = objectMapper.readValue(
payload,
objectMapper.getTypeFactory().constructParametricType(
TuohengMqttMessage.class,
Map.class
)
);
String messageType = extractMessageTypeFromTopic(topic);
if ("osd".equals(messageType)) {
handleOsdMessage(deviceSn, droneSn, airportSn, message);
} else if ("state".equals(messageType)) {
handleStateMessage(deviceSn, droneSn, airportSn, message);
} else if ("events".equals(messageType)) {
handleEventsMessage(deviceSn, message);
} else if ("services_reply".equals(messageType)) {
handleServicesReplyMessage(deviceSn, message);
} else if ("requests".equals(messageType)) {
handleRequestsMessage(deviceSn, message);
} else if ("status".equals(messageType)) {
handleStatusMessage(deviceSn, message);
if (messageType == null) {
log.warn("无法从Topic解析消息类型: {}", topic);
return;
}
switch (messageType) {
case "realTime/data":
handleRealTimeData(deviceSn, payload, topic);
break;
case "realTime/basic":
handleRealTimeBasicData(deviceSn, payload);
break;
case "osd":
handleOsdData(deviceSn, payload);
break;
case "events":
handleEventsData(deviceSn, payload);
break;
default:
log.debug("未知消息类型: {}", messageType);
}
} catch (Exception e) {
@ -142,126 +114,86 @@ public class TuohengMqttMessageHandler {
}
}
private void handleOsdMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage<Map<String, Object>> message) {
private void handleRealTimeData(String deviceSn, String payload, String topic) {
try {
if (isDroneSn(deviceSn) && droneSn != null) {
DroneOsdData droneOsdData = objectMapper.convertValue(message.getData(), DroneOsdData.class);
log.debug("处理无人机OSD数据 - SN: {}, Airport: {}", droneSn, airportSn);
for (IDroneOsdCallback callback : droneOsdCallbacks) {
if (topic.contains("airportNest")) {
TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class);
log.debug("处理机场实时数据 - 设备SN: {}", deviceSn);
for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) {
try {
callback.onDroneOsdData(droneSn, airportSn, droneOsdData);
callback.onRealTimeData(deviceSn, data);
} catch (Exception e) {
log.error("无人机OSD数据回调执行失败: {}", e.getMessage(), e);
log.error("实时数据回调执行失败: {}", e.getMessage(), e);
}
}
} else if (isAirportSn(deviceSn) && airportSn != null) {
AirportOsdData airportOsdData = objectMapper.convertValue(message.getData(), AirportOsdData.class);
log.debug("处理机场OSD数据 - SN: {}", airportSn);
for (IAirportOsdCallback callback : airportOsdCallbacks) {
} else if (topic.contains("airportDrone")) {
DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class);
log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn);
for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) {
try {
callback.onAirportOsdData(airportSn, airportOsdData);
callback.onDroneRealTimeData(deviceSn, data);
} catch (Exception e) {
log.error("机场OSD数据回调执行失败: {}", e.getMessage(), e);
log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理OSD消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
log.error("处理实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleStateMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage<Map<String, Object>> message) {
private void handleRealTimeBasicData(String deviceSn, String payload) {
try {
if (isAirportSn(deviceSn) && airportSn != null) {
AirportStateData airportStateData = objectMapper.convertValue(message.getData(), AirportStateData.class);
log.debug("处理机场State数据 - SN: {}, Drone: {}", airportSn, droneSn);
RealTimeBasicData data = objectMapper.readValue(payload, RealTimeBasicData.class);
log.debug("处理实时基础数据 - 设备SN: {}", deviceSn);
for (IAirportStateCallback callback : airportStateCallbacks) {
for (IRealTimeBasicCallback callback : realTimeBasicCallbacks) {
try {
callback.onAirportStateData(airportSn, droneSn, airportStateData);
callback.onRealTimeBasicData(deviceSn, data);
} catch (Exception e) {
log.error("机场State数据回调执行失败: {}", e.getMessage(), e);
}
log.error("实时基础数据回调执行失败: {}", e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("处理State消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
log.error("处理实时基础数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleEventsMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
private void handleOsdData(String deviceSn, String payload) {
try {
if (isAirportSn(deviceSn)) {
EventsData eventsData = objectMapper.convertValue(message.getData(), EventsData.class);
log.debug("处理Events数据 - SN: {}", deviceSn);
AirportOsdData data = objectMapper.readValue(payload, AirportOsdData.class);
log.debug("处理OSD数据 - 设备SN: {}", deviceSn);
for (IEventsCallback callback : eventsCallbacks) {
for (ITuohengOsdCallback callback : osdCallbacks) {
try {
callback.onEventsData(deviceSn, eventsData);
callback.onOsdData(deviceSn, data);
} catch (Exception e) {
log.error("OSD数据回调执行失败: {}", e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("处理OSD数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleEventsData(String deviceSn, String payload) {
try {
EventsData data = objectMapper.readValue(payload, EventsData.class);
log.debug("处理Events数据 - 设备SN: {}", deviceSn);
for (ITuohengEventsCallback callback : eventsCallbacks) {
try {
callback.onEventsData(deviceSn, data);
} catch (Exception e) {
log.error("Events数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理Events消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
log.error("处理Events数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleServicesReplyMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn)) {
log.debug("处理ServicesReply数据 - SN: {}, Method: {}", deviceSn, message.getMethod());
for (IServicesReplyCallback callback : servicesReplyCallbacks) {
try {
callback.onServicesReplyData(deviceSn, message.getData());
} catch (Exception e) {
log.error("ServicesReply数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理ServicesReply消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleRequestsMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
try {
if (isAirportSn(deviceSn)) {
log.debug("处理Requests数据 - SN: {}, Method: {}", deviceSn, message.getMethod());
for (IRequestsCallback callback : requestsCallbacks) {
try {
callback.onRequestsData(deviceSn, message.getMethod(), message.getData());
} catch (Exception e) {
log.error("Requests数据回调执行失败: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("处理Requests消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleStatusMessage(String deviceSn, TuohengMqttMessage<Map<String, Object>> message) {
log.debug("处理Status数据 - SN: {}", deviceSn);
}
private String extractDeviceSnFromTopic(String topic) {
if (topic == null) {
return null;
}
String[] parts = topic.split("/");
if (parts.length >= 3) {
return parts[2];
}
return null;
}
private String extractMessageTypeFromTopic(String topic) {
if (topic == null) {
return null;
}
@ -272,17 +204,28 @@ public class TuohengMqttMessageHandler {
return null;
}
private boolean isDroneSn(String sn) {
if (sn == null) {
return false;
private String extractMessageTypeFromTopic(String topic) {
if (topic == null) {
return null;
}
return DRONE_SN_PATTERN.matcher(sn).matches();
String[] parts = topic.split("/");
if (parts.length >= 5) {
return parts[4];
}
if (parts.length >= 4) {
return parts[3];
}
return null;
}
private boolean isAirportSn(String sn) {
private boolean isProductTopic(String topic) {
return topic != null && topic.startsWith("thing/product/");
}
private boolean isTuohengSn(String sn) {
if (sn == null) {
return false;
}
return AIRPORT_SN_PATTERN.matcher(sn).matches();
return TUOHENG_SN_PATTERN.matcher(sn).matches();
}
}

View File

@ -16,6 +16,9 @@ public class AirportOsdData {
@JsonProperty("working_current")
private String workingCurrent;
@JsonProperty("job_number")
private String jobNumber;
@JsonProperty("network_state")
private String networkState;

View File

@ -0,0 +1,137 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class DroneRealTimeData {
@JsonProperty("msg")
private String msg;
@JsonProperty("code")
private Integer code;
@JsonProperty("data")
private DroneInfo data;
@Data
public static class DroneInfo {
@JsonProperty("date")
private String date;
@JsonProperty("zoomvalue")
private String zoomvalue;
@JsonProperty("arrwayAirFilePoint")
private String arrwayAirFilePoint;
@JsonProperty("roll")
private Double roll;
@JsonProperty("volumn")
private String volumn;
@JsonProperty("lon")
private Double lon;
@JsonProperty("arrwaypointvalue")
private String arrwaypointvalue;
@JsonProperty("Timestamp")
private String timestamp;
@JsonProperty("landingState")
private String landingState;
@JsonProperty("mode")
private String mode;
@JsonProperty("landing_target_y")
private String landingTargetY;
@JsonProperty("distToHome")
private String distToHome;
@JsonProperty("landing_target_x")
private String landingTargetX;
@JsonProperty("datetime")
private Integer datetime;
@JsonProperty("satcount")
private Integer satcount;
@JsonProperty("gpssingal")
private Integer gpssingal;
@JsonProperty("battery_remain")
private String batteryRemain;
@JsonProperty("landing_target_z")
private String landingTargetZ;
@JsonProperty("pitch")
private Double pitch;
@JsonProperty("camerapitch")
private String camerapitch;
@JsonProperty("obstacle")
private String obstacle;
@JsonProperty("lat")
private Double lat;
@JsonProperty("hspeed")
private String hspeed;
@JsonProperty("mileage")
private String mileage;
@JsonProperty("altasl")
private String altasl;
@JsonProperty("laser_distance")
private String laserDistance;
@JsonProperty("tsingal")
private Integer tsingal;
@JsonProperty("flytime")
private Integer flytime;
@JsonProperty("alt")
private String alt;
@JsonProperty("ttlState")
private String ttlState;
@JsonProperty("deviceid")
private String deviceid;
@JsonProperty("yaw")
private Double yaw;
@JsonProperty("voltage")
private String voltage;
@JsonProperty("camerayaw")
private String camerayaw;
@JsonProperty("vspeed")
private String vspeed;
@JsonProperty("armed")
private String armed;
@JsonProperty("ysingal")
private Integer ysingal;
@JsonProperty("altasl2")
private String altasl2;
@JsonProperty("cameraroll")
private String cameraroll;
}
}

View File

@ -0,0 +1,50 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class RealTimeBasicData {
@JsonProperty("msg")
private String msg;
@JsonProperty("code")
private Integer code;
@JsonProperty("data")
private BasicInfo data;
@JsonProperty("sender")
private String sender;
@JsonProperty("send_timestamp")
private Long sendTimestamp;
@JsonProperty("fun")
private String fun;
@Data
public static class BasicInfo {
@JsonProperty("airportID")
private String airportId;
@JsonProperty("version")
private VersionInfo version;
@JsonProperty("mac")
private String mac;
@JsonProperty("status")
private String status;
}
@Data
public static class VersionInfo {
@JsonProperty("software")
private String software;
@JsonProperty("hardware")
private String hardware;
}
}

View File

@ -0,0 +1,160 @@
package com.ruoyi.device.domain.impl.tuohengmqtt.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.util.Map;
@Data
public class TuohengRealTimeData {
@JsonProperty("nestWeather")
private TuohengResponse<TuohengWeatherData> nestWeather;
@JsonProperty("droneBattery")
private TuohengResponse<TuohengBatteryData> droneBattery;
@JsonProperty("nestInnerSensor")
private TuohengResponse<TuohengSensorData> nestInnerSensor;
@JsonProperty("nestCharger")
private TuohengResponse<TuohengChargerData> nestCharger;
@Data
public static class TuohengResponse<T> {
@JsonProperty("msg")
private String msg;
@JsonProperty("code")
private Integer code;
@JsonProperty("data")
private T data;
@JsonProperty("sender")
private String sender;
@JsonProperty("fun")
private String fun;
}
@Data
public static class TuohengWeatherData {
@JsonProperty("rainfall")
private String rainfall;
@JsonProperty("windLevel")
private String windLevel;
@JsonProperty("windDir")
private String windDir;
@JsonProperty("windSpeed")
private String windSpeed;
@JsonProperty("rainFlag")
private Integer rainFlag;
@JsonProperty("windAngle")
private String windAngle;
}
@Data
public static class TuohengBatteryData {
@JsonProperty("cell_1Voltage")
private String cell1Voltage;
@JsonProperty("cell_2Voltage")
private String cell2Voltage;
@JsonProperty("cell_3Voltage")
private String cell3Voltage;
@JsonProperty("cell_4Voltage")
private String cell4Voltage;
@JsonProperty("cell_5Voltage")
private String cell5Voltage;
@JsonProperty("cell_6Voltage")
private String cell6Voltage;
@JsonProperty("Battery_level")
private Integer batteryLevel;
@JsonProperty("totalVoltage")
private String totalVoltage;
@JsonProperty("bCharging")
private Integer bCharging;
@JsonProperty("Battery_health")
private Integer batteryHealth;
@JsonProperty("chargeTime")
private Long chargeTime;
@JsonProperty("num_cycles")
private Integer numCycles;
@JsonProperty("bState")
private Integer bState;
@JsonProperty("bPowerON")
private Integer bPowerON;
@JsonProperty("Discharge_current")
private String dischargeCurrent;
@JsonProperty("bBattPower")
private Integer bBattPower;
@JsonProperty("startCharge")
private String startCharge;
@JsonProperty("versions")
private Integer versions;
@JsonProperty("bDesc")
private String bDesc;
@JsonProperty("cellTemp")
private String cellTemp;
@JsonProperty("mosTemp")
private String mosTemp;
@JsonProperty("model")
private String model;
}
@Data
public static class TuohengSensorData {
@JsonProperty("innerHum")
private String innerHum;
@JsonProperty("innerTemp")
private String innerTemp;
}
@Data
public static class TuohengChargerData {
@JsonProperty("startCharge")
private String startCharge;
@JsonProperty("current")
private String current;
@JsonProperty("bCharging")
private Integer bCharging;
@JsonProperty("chargeTime")
private Long chargeTime;
@JsonProperty("capacity")
private String capacity;
@JsonProperty("voltage")
private String voltage;
}
}

View File

@ -20,12 +20,13 @@ public class TuohengMqttClientService {
private final TuohengMqttMessageHandler messageHandler;
private MqttClient mqttClient;
private static final String AIRPORT_OSD_TOPIC = "thing/product/+/osd";
private static final String AIRPORT_STATE_TOPIC = "thing/product/+/state";
private static final String AIRPORT_EVENTS_TOPIC = "thing/product/+/events";
private static final String SERVICES_REPLY_TOPIC = "thing/product/+/services_reply";
private static final String STATUS_TOPIC = "sys/product/+/status";
private static final String REQUESTS_TOPIC = "thing/product/+/requests";
private static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data";
private static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic";
private static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data";
private static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd";
private static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events";
private static final String PRODUCT_SERVICES_TOPIC = "thing/product/+/services";
private static final String PRODUCT_SERVICES_REPLY_TOPIC = "thing/product/+/services_reply";
public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) {
this.config = config;
@ -118,47 +119,24 @@ public class TuohengMqttClientService {
return;
}
String airportOsdTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_OSD_TOPIC)
: AIRPORT_OSD_TOPIC;
String[] topics = {
AIRPORT_NEST_REALTIME_TOPIC,
AIRPORT_NEST_BASIC_TOPIC,
AIRPORT_DRONE_REALTIME_TOPIC,
PRODUCT_OSD_TOPIC,
PRODUCT_EVENTS_TOPIC,
PRODUCT_SERVICES_TOPIC,
PRODUCT_SERVICES_REPLY_TOPIC
};
String airportStateTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_STATE_TOPIC)
: AIRPORT_STATE_TOPIC;
for (String topic : topics) {
String subscribeTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), topic)
: topic;
String airportEventsTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_EVENTS_TOPIC)
: AIRPORT_EVENTS_TOPIC;
String servicesReplyTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), SERVICES_REPLY_TOPIC)
: SERVICES_REPLY_TOPIC;
String statusTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), STATUS_TOPIC)
: STATUS_TOPIC;
String requestsTopic = config.getUseSharedSubscription()
? String.format("$share/%s/%s", config.getSharedGroupName(), REQUESTS_TOPIC)
: REQUESTS_TOPIC;
mqttClient.subscribe(airportOsdTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportOsdTopic);
mqttClient.subscribe(airportStateTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportStateTopic);
mqttClient.subscribe(airportEventsTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportEventsTopic);
mqttClient.subscribe(servicesReplyTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), servicesReplyTopic);
mqttClient.subscribe(statusTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), statusTopic);
mqttClient.subscribe(requestsTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), requestsTopic);
mqttClient.subscribe(subscribeTopic, 1);
log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), subscribeTopic);
}
} catch (Exception e) {
log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e);

View File

@ -1,28 +1,24 @@
package com.ruoyi.device.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.device.domain.api.IDockAircraftDomain;
import com.ruoyi.device.domain.api.IDockDomain;
import com.ruoyi.device.domain.api.IAircraftDomain;
import com.ruoyi.device.domain.api.IDeviceDomain;
import com.ruoyi.device.domain.model.DockAircraft;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengOsdCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengRealTimeDataCallback;
import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig;
import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler;
import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData;
import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData;
import com.ruoyi.device.domain.model.Aircraft;
import com.ruoyi.device.domain.model.Device;
import com.ruoyi.device.domain.model.Dock;
import com.ruoyi.device.domain.model.DockAircraft;
import com.ruoyi.device.service.config.TuohengMqttProperties;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
@ -78,13 +74,12 @@ public class TuohengService {
TuohengMqttMessageHandler handler = clientManager.getHandler();
Map<String, String> mapping = loadAirportDroneMapping();
handler.setAirportDroneMapping(mapping);
handler.registerAirportOsdCallback(new IAirportOsdCallback() {
handler.registerRealTimeDataCallback(new ITuohengRealTimeDataCallback() {
@Override
public void onAirportOsdData(String airportSn, AirportOsdData data) {
log.info("========== 收到机场OSD数据 ==========");
log.info("机场SN: {}", airportSn);
public void onRealTimeData(String deviceSn, TuohengRealTimeData data) {
log.info("========== 收到拓恒实时数据 ==========");
log.info("设备SN: {}", deviceSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
@ -94,11 +89,11 @@ public class TuohengService {
}
});
handler.registerDroneOsdCallback(new IDroneOsdCallback() {
handler.registerOsdCallback(new ITuohengOsdCallback() {
@Override
public void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data) {
log.info("========== 收到无人机OSD数据 ==========");
log.info("无人机SN: {}, 机场SN: {}", droneSn, airportSn);
public void onOsdData(String deviceSn, AirportOsdData data) {
log.info("========== 收到拓恒OSD数据 ==========");
log.info("设备SN: {}", deviceSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
@ -108,53 +103,11 @@ public class TuohengService {
}
});
handler.registerAirportStateCallback(new IAirportStateCallback() {
handler.registerEventsCallback(new ITuohengEventsCallback() {
@Override
public void onAirportStateData(String airportSn, String droneSn, AirportStateData data) {
log.info("========== 收到机场State数据 ==========");
log.info("机场SN: {}, 无人机SN: {}", airportSn, droneSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerEventsCallback(new IEventsCallback() {
@Override
public void onEventsData(String airportSn, EventsData data) {
log.info("========== 收到Events数据 ==========");
log.info("机场SN: {}", airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerServicesReplyCallback(new IServicesReplyCallback() {
@Override
public void onServicesReplyData(String airportSn, Map<String, Object> data) {
log.info("========== 收到ServicesReply数据 ==========");
log.info("机场SN: {}", airportSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {
log.error("序列化数据失败", e);
}
log.info("=====================================");
}
});
handler.registerRequestsCallback(new IRequestsCallback() {
@Override
public void onRequestsData(String airportSn, String method, Map<String, Object> data) {
log.info("========== 收到Requests数据 ==========");
log.info("机场SN: {}, Method: {}", airportSn, method);
public void onEventsData(String deviceSn, EventsData data) {
log.info("========== 收到拓恒Events数据 ==========");
log.info("设备SN: {}", deviceSn);
try {
log.info("数据内容: {}", objectMapper.writeValueAsString(data));
} catch (Exception e) {