diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/README.md b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/README.md new file mode 100644 index 0000000..10a2000 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/README.md @@ -0,0 +1,102 @@ +# TuohengMqtt 模块说明 + +## 概述 + +本模块提供对大疆设备MQTT消息的订阅和处理功能,支持监听机场和无人机的各类Topic消息。 + +## 包结构 + +``` +com.ruoyi.device.domain.impl.tuohengmqtt +├── callback/ # 数据回调接口 +│ ├── IAirportOsdCallback.java # 机场OSD数据回调 +│ ├── IDroneOsdCallback.java # 无人机OSD数据回调 +│ ├── IAirportStateCallback.java # 机场State数据回调 +│ ├── IEventsCallback.java # Events数据回调 +│ ├── IServicesReplyCallback.java # 服务回复回调 +│ └── IRequestsCallback.java # 设备请求回调 +├── config/ # 配置类 +│ └── TuohengMqttClientConfig.java +├── handler/ # 消息处理器 +│ └── TuohengMqttMessageHandler.java +├── manager/ # 客户端管理器 +│ └── TuohengMqttClientManager.java +├── model/ # 数据模型 +│ ├── TuohengMqttMessage.java # MQTT消息基础结构 +│ ├── AirportOsdData.java # 机场OSD数据 +│ ├── DroneOsdData.java # 无人机OSD数据 +│ ├── AirportStateData.java # 机场State数据 +│ └── EventsData.java # Events数据 +├── service/ # MQTT客户端服务 +│ └── TuohengMqttClientService.java +└── example/ # 使用示例 + └── TuohengMqttUsageExample.java +``` + +## 支持的Topic + +| Topic | 说明 | 示例 | +|-------|------|------| +| `thing/product/{AirportSn}/osd` | 机场OSD数据 | thing/product/7CTDM3D00BVY4C/osd | +| `thing/product/{DroneSn}/osd` | 无人机OSD数据 | thing/product/1581F6Q8D243100C605L/osd | +| `thing/product/{AirportSn}/state` | 机场State数据 | thing/product/7CTDM3D00BVY4C/state | +| `thing/product/{AirportSn}/events` | 机场Events数据 | thing/product/7CTDM3D00BVY4C/events | +| `thing/product/{AirportSn}/services_reply` | 服务回复 | thing/product/7CTDM3D00BVY4C/services_reply | +| `sys/product/{AirportSn}/status` | 设备状态 | sys/product/7CTDM3D00BVY4C/status | +| `thing/product/{AirportSn}/requests` | 设备请求 | thing/product/7CTDM3D00BVY4C/requests | + +## 使用方法 + +### 1. 创建MQTT客户端配置 + +```java +TuohengMqttClientConfig config = TuohengMqttClientConfig.builder() + .host("your-mqtt-host") + .port(1883) + .clientId("tuoheng-client-1") + .username("your-username") + .password("your-password") + .build(); +``` + +### 2. 创建客户端 + +```java +@Autowired +private TuohengMqttClientManager manager; + +String clientId = manager.createClient(config); +``` + +### 3. 注册回调 + +```java +var handler = manager.getHandler(clientId); + +// 设置机场与无人机的SN映射 +Map mapping = new HashMap<>(); +mapping.put("7CTDM3D00BVY4C", "1581F6Q8D243100C605L"); +handler.setAirportDroneMapping(mapping); + +// 注册各种回调 +handler.registerAirportOsdCallback(new IAirportOsdCallback() { + @Override + public void onAirportOsdData(String airportSn, AirportOsdData data) { + // 处理机场OSD数据 + } +}); +``` + +### 4. 发送消息 + +```java +var client = manager.getClient(clientId); +client.publish("thing/product/7CTDM3D00BVY4C/services", message); +``` + +## 注意事项 + +1. 机场SN以 `7C` 开头,无人机SN以 `158` 开头 +2. 需要正确配置机场与无人机的SN映射关系 +3. 使用MQTT 5.0协议 (Eclipse Paho MQTT v5) +4. 支持共享订阅,可通过配置开启 diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportOsdCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportOsdCallback.java new file mode 100644 index 0000000..9867efa --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportOsdCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; + +/** + * 机场OSD数据回调接口 + * + * @author ruoyi + */ +public interface IAirportOsdCallback { + + void onAirportOsdData(String airportSn, AirportOsdData data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportStateCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportStateCallback.java new file mode 100644 index 0000000..6e892fa --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IAirportStateCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData; + +/** + * 机场State数据回调接口 + * + * @author ruoyi + */ +public interface IAirportStateCallback { + + void onAirportStateData(String airportSn, String droneSn, AirportStateData data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IDroneOsdCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IDroneOsdCallback.java new file mode 100644 index 0000000..fe4576b --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IDroneOsdCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData; + +/** + * 无人机OSD数据回调接口 + * + * @author ruoyi + */ +public interface IDroneOsdCallback { + + void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IEventsCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IEventsCallback.java new file mode 100644 index 0000000..ed257d7 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IEventsCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; + +/** + * 机场Events数据回调接口 + * + * @author ruoyi + */ +public interface IEventsCallback { + + void onEventsData(String airportSn, EventsData data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IRequestsCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IRequestsCallback.java new file mode 100644 index 0000000..7b06768 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IRequestsCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import java.util.Map; + +/** + * 设备请求数据回调接口 + * + * @author ruoyi + */ +public interface IRequestsCallback { + + void onRequestsData(String airportSn, String method, Map data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IServicesReplyCallback.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IServicesReplyCallback.java new file mode 100644 index 0000000..c957e89 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/callback/IServicesReplyCallback.java @@ -0,0 +1,13 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.callback; + +import java.util.Map; + +/** + * 服务回复数据回调接口 + * + * @author ruoyi + */ +public interface IServicesReplyCallback { + + void onServicesReplyData(String airportSn, Map data); +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/config/TuohengMqttClientConfig.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/config/TuohengMqttClientConfig.java new file mode 100644 index 0000000..e1c42a3 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/config/TuohengMqttClientConfig.java @@ -0,0 +1,44 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.config; + +import lombok.Builder; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * Tuoheng MQTT客户端配置 + * + * @author ruoyi + */ +@Data +@Builder +public class TuohengMqttClientConfig { + + private String host; + + private Integer port; + + private String clientId; + + private String username; + + private String password; + + @Builder.Default + private Integer connectionTimeout = 30; + + @Builder.Default + private Integer keepAliveInterval = 60; + + @Builder.Default + private Boolean autoReconnect = true; + + @Builder.Default + private Boolean cleanSession = true; + + @Builder.Default + private Boolean useSharedSubscription = true; + + @Builder.Default + private String sharedGroupName = "tuoheng-group"; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/example/TuohengMqttUsageExample.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/example/TuohengMqttUsageExample.java new file mode 100644 index 0000000..a5a04ac --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/example/TuohengMqttUsageExample.java @@ -0,0 +1,90 @@ +//package com.ruoyi.device.domain.impl.tuohengmqtt.example; +// +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback; +//import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig; +//import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager; +//import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; +//import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData; +//import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData; +//import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Slf4j +//@Component +//public class TuohengMqttUsageExample { +// +// @Autowired +// private TuohengMqttClientManager manager; +// +// public void initExample() { +// TuohengMqttClientConfig config = TuohengMqttClientConfig.builder() +// .host("your-mqtt-host") +// .port(1883) +// .clientId("tuoheng-client-1") +// .username("your-username") +// .password("your-password") +// .build(); +// +// String clientId = manager.createClient(config); +// log.info("创建MQTT客户端: {}", clientId); +// +// var handler = manager.getHandler(clientId); +// +// Map mapping = new HashMap<>(); +// mapping.put("7CTDM3D00BVY4C", "1581F6Q8D243100C605L"); +// handler.setAirportDroneMapping(mapping); +// +// handler.registerAirportOsdCallback(new IAirportOsdCallback() { +// @Override +// public void onAirportOsdData(String airportSn, AirportOsdData data) { +// log.info("收到机场OSD数据 - AirportSN: {}, Temperature: {}", airportSn, data.getTemperature()); +// } +// }); +// +// handler.registerDroneOsdCallback(new IDroneOsdCallback() { +// @Override +// public void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data) { +// log.info("收到无人机OSD数据 - DroneSN: {}, AirportSN: {}, Latitude: {}, Longitude: {}", +// droneSn, airportSn, data.getLatitude(), data.getLongitude()); +// } +// }); +// +// handler.registerAirportStateCallback(new IAirportStateCallback() { +// @Override +// public void onAirportStateData(String airportSn, String droneSn, AirportStateData data) { +// log.info("收到机场State数据 - AirportSN: {}, DroneSN: {}", airportSn, droneSn); +// } +// }); +// +// handler.registerEventsCallback(new IEventsCallback() { +// @Override +// public void onEventsData(String airportSn, EventsData data) { +// log.info("收到Events数据 - AirportSN: {}, Event: {}", airportSn, data.getEvent()); +// } +// }); +// +// handler.registerServicesReplyCallback(new IServicesReplyCallback() { +// @Override +// public void onServicesReplyData(String airportSn, Map data) { +// log.info("收到ServicesReply数据 - AirportSN: {}", airportSn); +// } +// }); +// +// handler.registerRequestsCallback(new IRequestsCallback() { +// @Override +// public void onRequestsData(String airportSn, String method, Map data) { +// log.info("收到Requests数据 - AirportSN: {}, Method: {}", airportSn, method); +// } +// }); +// } +//} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java new file mode 100644 index 0000000..c94a87b --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java @@ -0,0 +1,288 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.handler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengMqttMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +@Slf4j +@Component +public class TuohengMqttMessageHandler { + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final List airportOsdCallbacks = new ArrayList<>(); + + private final List droneOsdCallbacks = new ArrayList<>(); + + private final List airportStateCallbacks = new ArrayList<>(); + + private final List eventsCallbacks = new ArrayList<>(); + + private final List servicesReplyCallbacks = new ArrayList<>(); + + private final List requestsCallbacks = new ArrayList<>(); + + private static final Pattern DRONE_SN_PATTERN = Pattern.compile("^158[0-9A-Z]+$"); + + private static final Pattern AIRPORT_SN_PATTERN = Pattern.compile("^7C[0-9A-Z]+$"); + + private Map airportDroneMapping; + + public void setAirportDroneMapping(Map mapping) { + this.airportDroneMapping = mapping; + } + + public void registerAirportOsdCallback(IAirportOsdCallback callback) { + if (callback != null && !airportOsdCallbacks.contains(callback)) { + airportOsdCallbacks.add(callback); + log.info("注册机场OSD数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void registerDroneOsdCallback(IDroneOsdCallback callback) { + if (callback != null && !droneOsdCallbacks.contains(callback)) { + droneOsdCallbacks.add(callback); + log.info("注册无人机OSD数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void registerAirportStateCallback(IAirportStateCallback callback) { + if (callback != null && !airportStateCallbacks.contains(callback)) { + airportStateCallbacks.add(callback); + log.info("注册机场State数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void registerEventsCallback(IEventsCallback callback) { + if (callback != null && !eventsCallbacks.contains(callback)) { + eventsCallbacks.add(callback); + log.info("注册Events数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void registerServicesReplyCallback(IServicesReplyCallback callback) { + if (callback != null && !servicesReplyCallbacks.contains(callback)) { + servicesReplyCallbacks.add(callback); + log.info("注册ServicesReply数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void registerRequestsCallback(IRequestsCallback callback) { + if (callback != null && !requestsCallbacks.contains(callback)) { + requestsCallbacks.add(callback); + log.info("注册Requests数据回调: {}", callback.getClass().getSimpleName()); + } + } + + public void handleMessage(String topic, String payload) { + try { + log.debug("收到MQTT消息 - Topic: {}, Payload: {}", topic, payload); + + String deviceSn = extractDeviceSnFromTopic(topic); + String messageType = extractMessageTypeFromTopic(topic); + + if (deviceSn == null || messageType == null) { + log.warn("无法从Topic解析设备SN或消息类型: {}", topic); + return; + } + + String droneSn = null; + String airportSn = null; + + if (isAirportSn(deviceSn)) { + airportSn = deviceSn; + if (airportDroneMapping != null) { + droneSn = airportDroneMapping.get(deviceSn); + } + } else if (isDroneSn(deviceSn)) { + droneSn = deviceSn; + } + + @SuppressWarnings("unchecked") + TuohengMqttMessage> message = objectMapper.readValue( + payload, + objectMapper.getTypeFactory().constructParametricType( + TuohengMqttMessage.class, + Map.class + ) + ); + + if ("osd".equals(messageType)) { + handleOsdMessage(deviceSn, droneSn, airportSn, message); + } else if ("state".equals(messageType)) { + handleStateMessage(deviceSn, droneSn, airportSn, message); + } else if ("events".equals(messageType)) { + handleEventsMessage(deviceSn, message); + } else if ("services_reply".equals(messageType)) { + handleServicesReplyMessage(deviceSn, message); + } else if ("requests".equals(messageType)) { + handleRequestsMessage(deviceSn, message); + } else if ("status".equals(messageType)) { + handleStatusMessage(deviceSn, message); + } + + } catch (Exception e) { + log.error("处理MQTT消息失败 - Topic: {}, Error: {}", topic, e.getMessage(), e); + } + } + + private void handleOsdMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage> message) { + try { + if (isDroneSn(deviceSn) && droneSn != null) { + DroneOsdData droneOsdData = objectMapper.convertValue(message.getData(), DroneOsdData.class); + log.debug("处理无人机OSD数据 - SN: {}, Airport: {}", droneSn, airportSn); + + for (IDroneOsdCallback callback : droneOsdCallbacks) { + try { + callback.onDroneOsdData(droneSn, airportSn, droneOsdData); + } catch (Exception e) { + log.error("无人机OSD数据回调执行失败: {}", e.getMessage(), e); + } + } + } else if (isAirportSn(deviceSn) && airportSn != null) { + AirportOsdData airportOsdData = objectMapper.convertValue(message.getData(), AirportOsdData.class); + log.debug("处理机场OSD数据 - SN: {}", airportSn); + + for (IAirportOsdCallback callback : airportOsdCallbacks) { + try { + callback.onAirportOsdData(airportSn, airportOsdData); + } catch (Exception e) { + log.error("机场OSD数据回调执行失败: {}", e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("处理OSD消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleStateMessage(String deviceSn, String droneSn, String airportSn, TuohengMqttMessage> message) { + try { + if (isAirportSn(deviceSn) && airportSn != null) { + AirportStateData airportStateData = objectMapper.convertValue(message.getData(), AirportStateData.class); + log.debug("处理机场State数据 - SN: {}, Drone: {}", airportSn, droneSn); + + for (IAirportStateCallback callback : airportStateCallbacks) { + try { + callback.onAirportStateData(airportSn, droneSn, airportStateData); + } catch (Exception e) { + log.error("机场State数据回调执行失败: {}", e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("处理State消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleEventsMessage(String deviceSn, TuohengMqttMessage> message) { + try { + if (isAirportSn(deviceSn)) { + EventsData eventsData = objectMapper.convertValue(message.getData(), EventsData.class); + log.debug("处理Events数据 - SN: {}", deviceSn); + + for (IEventsCallback callback : eventsCallbacks) { + try { + callback.onEventsData(deviceSn, eventsData); + } catch (Exception e) { + log.error("Events数据回调执行失败: {}", e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("处理Events消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleServicesReplyMessage(String deviceSn, TuohengMqttMessage> message) { + try { + if (isAirportSn(deviceSn)) { + log.debug("处理ServicesReply数据 - SN: {}, Method: {}", deviceSn, message.getMethod()); + + for (IServicesReplyCallback callback : servicesReplyCallbacks) { + try { + callback.onServicesReplyData(deviceSn, message.getData()); + } catch (Exception e) { + log.error("ServicesReply数据回调执行失败: {}", e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("处理ServicesReply消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleRequestsMessage(String deviceSn, TuohengMqttMessage> message) { + try { + if (isAirportSn(deviceSn)) { + log.debug("处理Requests数据 - SN: {}, Method: {}", deviceSn, message.getMethod()); + + for (IRequestsCallback callback : requestsCallbacks) { + try { + callback.onRequestsData(deviceSn, message.getMethod(), message.getData()); + } catch (Exception e) { + log.error("Requests数据回调执行失败: {}", e.getMessage(), e); + } + } + } + } catch (Exception e) { + log.error("处理Requests消息失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); + } + } + + private void handleStatusMessage(String deviceSn, TuohengMqttMessage> message) { + log.debug("处理Status数据 - SN: {}", deviceSn); + } + + private String extractDeviceSnFromTopic(String topic) { + if (topic == null) { + return null; + } + String[] parts = topic.split("/"); + if (parts.length >= 3) { + return parts[2]; + } + return null; + } + + private String extractMessageTypeFromTopic(String topic) { + if (topic == null) { + return null; + } + String[] parts = topic.split("/"); + if (parts.length >= 4) { + return parts[3]; + } + return null; + } + + private boolean isDroneSn(String sn) { + if (sn == null) { + return false; + } + return DRONE_SN_PATTERN.matcher(sn).matches(); + } + + private boolean isAirportSn(String sn) { + if (sn == null) { + return false; + } + return AIRPORT_SN_PATTERN.matcher(sn).matches(); + } +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/manager/TuohengMqttClientManager.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/manager/TuohengMqttClientManager.java new file mode 100644 index 0000000..dedd6b2 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/manager/TuohengMqttClientManager.java @@ -0,0 +1,47 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.manager; + +import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig; +import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler; +import com.ruoyi.device.domain.impl.tuohengmqtt.service.TuohengMqttClientService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class TuohengMqttClientManager { + + private TuohengMqttClientService clientService; + private TuohengMqttMessageHandler messageHandler; + + public void initClient(TuohengMqttClientConfig config) { + if (clientService != null && clientService.isConnected()) { + log.info("MQTT客户端已连接,无需重复创建"); + return; + } + + messageHandler = new TuohengMqttMessageHandler(); + clientService = new TuohengMqttClientService(config, messageHandler); + + clientService.connect(); + log.info("MQTT客户端创建并连接成功: {}", config.getClientId()); + } + + public TuohengMqttMessageHandler getHandler() { + return messageHandler; + } + + public TuohengMqttClientService getClient() { + return clientService; + } + + public void disconnect() { + if (clientService != null) { + clientService.disconnect(); + log.info("MQTT客户端已断开连接"); + } + } + + public boolean isConnected() { + return clientService != null && clientService.isConnected(); + } +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportOsdData.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportOsdData.java new file mode 100644 index 0000000..4b7698e --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportOsdData.java @@ -0,0 +1,66 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * 机场OSD数据 + * + * @author ruoyi + */ +@Data +public class AirportOsdData { + + @JsonProperty("working_current") + private String workingCurrent; + + @JsonProperty("drc_state") + private String drcState; + + @JsonProperty("longitude") + private String longitude; + + @JsonProperty("latitude") + private String latitude; + + @JsonProperty("height") + private String height; + + @JsonProperty("air_conditioner") + private Map airConditioner; + + @JsonProperty("humidity") + private String humidity; + + @JsonProperty("temperature") + private String temperature; + + @JsonProperty("environment_temperature") + private String environmentTemperature; + + @JsonProperty("wind_speed") + private String windSpeed; + + @JsonProperty("rainfall") + private String rainfall; + + @JsonProperty("drone_in_dock") + private String droneInDock; + + @JsonProperty("cover_state") + private String coverState; + + @JsonProperty("flighttask_step_code") + private String flighttaskStepCode; + + @JsonProperty("mode_code") + private String modeCode; + + @JsonProperty("position_state") + private Map positionState; + + @JsonProperty("drone_battery_maintenance_info") + private Map droneBatteryMaintenanceInfo; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportStateData.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportStateData.java new file mode 100644 index 0000000..23e5676 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/AirportStateData.java @@ -0,0 +1,21 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * 机场State数据 + * + * @author ruoyi + */ +@Data +public class AirportStateData { + + @JsonProperty("live_status") + private String liveStatus; + + @JsonProperty("live_capacity") + private String liveCapacity; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/DroneOsdData.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/DroneOsdData.java new file mode 100644 index 0000000..b1c16cf --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/DroneOsdData.java @@ -0,0 +1,57 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * 无人机OSD数据 + * + * @author ruoyi + */ +@Data +public class DroneOsdData { + + @JsonProperty("longitude") + private String longitude; + + @JsonProperty("latitude") + private String latitude; + + @JsonProperty("elevation") + private String elevation; + + @JsonProperty("height") + private String height; + + @JsonProperty("wind_speed") + private String windSpeed; + + @JsonProperty("wind_direction") + private String windDirection; + + @JsonProperty("position_state") + private Map positionState; + + @JsonProperty("low_battery_warning_threshold") + private String lowBatteryWarningThreshold; + + @JsonProperty("battery") + private Map battery; + + @JsonProperty("mode_code") + private String modeCode; + + @JsonProperty("horizontal_speed") + private String horizontalSpeed; + + @JsonProperty("vertical_speed") + private String verticalSpeed; + + @JsonProperty("home_distance") + private String homeDistance; + + @JsonProperty("cameras") + private String cameras; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/EventsData.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/EventsData.java new file mode 100644 index 0000000..d86af17 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/EventsData.java @@ -0,0 +1,24 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.util.Map; + +/** + * 机场Events数据 + * + * @author ruoyi + */ +@Data +public class EventsData { + + @JsonProperty("event") + private String event; + + @JsonProperty("result") + private Integer result; + + @JsonProperty("output") + private Map output; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/TuohengMqttMessage.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/TuohengMqttMessage.java new file mode 100644 index 0000000..a1f0a13 --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/model/TuohengMqttMessage.java @@ -0,0 +1,37 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +/** + * Tuoheng MQTT消息基础结构 + * + * @author ruoyi + */ +@Data +public class TuohengMqttMessage { + + @JsonProperty("tid") + private String tid; + + @JsonProperty("bid") + private String bid; + + @JsonProperty("timestamp") + private Long timestamp; + + @JsonProperty("method") + private String method; + + @JsonProperty("data") + private T data; + + @JsonProperty("gateway") + private String gateway; + + @JsonProperty("source") + private String source; + + @JsonProperty("need_reply") + private Integer needReply; +} diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java new file mode 100644 index 0000000..c86e0df --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/service/TuohengMqttClientService.java @@ -0,0 +1,202 @@ +package com.ruoyi.device.domain.impl.tuohengmqtt.service; + +import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig; +import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; + +@Slf4j +public class TuohengMqttClientService { + + private final TuohengMqttClientConfig config; + private final TuohengMqttMessageHandler messageHandler; + private MqttClient mqttClient; + + private static final String AIRPORT_OSD_TOPIC = "thing/product/+/osd"; + private static final String AIRPORT_STATE_TOPIC = "thing/product/+/state"; + private static final String AIRPORT_EVENTS_TOPIC = "thing/product/+/events"; + private static final String SERVICES_REPLY_TOPIC = "thing/product/+/services_reply"; + private static final String STATUS_TOPIC = "sys/product/+/status"; + private static final String REQUESTS_TOPIC = "thing/product/+/requests"; + + public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) { + this.config = config; + this.messageHandler = messageHandler; + } + + public void connect() { + try { + if (mqttClient != null && mqttClient.isConnected()) { + log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId()); + return; + } + + String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort()); + log.info("开始连接Tuoheng MQTT服务器[{}]: {}", config.getClientId(), broker); + + mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence()); + + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setUserName(config.getUsername()); + options.setPassword(config.getPassword().getBytes()); + options.setConnectionTimeout(config.getConnectionTimeout()); + options.setKeepAliveInterval(config.getKeepAliveInterval()); + options.setAutomaticReconnect(config.getAutoReconnect()); + options.setCleanStart(config.getCleanSession()); + + mqttClient.setCallback(new MqttCallback() { + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(), + disconnectResponse.getReasonString()); + + if (config.getAutoReconnect()) { + log.info("MQTT客户端[{}]将自动重连...", config.getClientId()); + } + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("MQTT客户端[{}]发生错误: {}", config.getClientId(), + exception.getMessage(), exception); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + try { + String payload = new String(message.getPayload()); + messageHandler.handleMessage(topic, payload); + } catch (Exception e) { + log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), + e.getMessage(), e); + } + } + + @Override + public void deliveryComplete(IMqttToken token) { + + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + if (reconnect) { + log.info("MQTT客户端[{}]重连成功: {}", config.getClientId(), serverURI); + subscribe(); + } else { + log.info("MQTT客户端[{}]首次连接成功: {}", config.getClientId(), serverURI); + } + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + + } + }); + + mqttClient.connect(options); + log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId()); + + subscribe(); + + } catch (Exception e) { + log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e); + } + } + + private void subscribe() { + try { + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId()); + return; + } + + String airportOsdTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_OSD_TOPIC) + : AIRPORT_OSD_TOPIC; + + String airportStateTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_STATE_TOPIC) + : AIRPORT_STATE_TOPIC; + + String airportEventsTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), AIRPORT_EVENTS_TOPIC) + : AIRPORT_EVENTS_TOPIC; + + String servicesReplyTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), SERVICES_REPLY_TOPIC) + : SERVICES_REPLY_TOPIC; + + String statusTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), STATUS_TOPIC) + : STATUS_TOPIC; + + String requestsTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), REQUESTS_TOPIC) + : REQUESTS_TOPIC; + + mqttClient.subscribe(airportOsdTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportOsdTopic); + + mqttClient.subscribe(airportStateTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportStateTopic); + + mqttClient.subscribe(airportEventsTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), airportEventsTopic); + + mqttClient.subscribe(servicesReplyTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), servicesReplyTopic); + + mqttClient.subscribe(statusTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), statusTopic); + + mqttClient.subscribe(requestsTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), requestsTopic); + + } catch (Exception e) { + log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e); + } + } + + public void disconnect() { + try { + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + mqttClient.close(); + log.info("MQTT客户端[{}]已断开连接", config.getClientId()); + } + } catch (Exception e) { + log.error("MQTT客户端[{}]断开连接失败: {}", config.getClientId(), e.getMessage(), e); + } + } + + public boolean isConnected() { + return mqttClient != null && mqttClient.isConnected(); + } + + public String getClientId() { + return config.getClientId(); + } + + public void publish(String topic, String message) { + try { + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("MQTT客户端[{}]未连接,无法发送消息", config.getClientId()); + return; + } + MqttMessage mqttMessage = new MqttMessage(message.getBytes()); + mqttMessage.setQos(1); + mqttClient.publish(topic, mqttMessage); + log.debug("MQTT客户端[{}]发送消息 - Topic: {}, Message: {}", config.getClientId(), topic, message); + } catch (MqttException e) { + log.error("MQTT客户端[{}]发送消息失败 - Topic: {}, Error: {}", config.getClientId(), topic, e.getMessage(), e); + } + } +} diff --git a/src/main/java/com/ruoyi/device/service/config/TuohengMqttProperties.java b/src/main/java/com/ruoyi/device/service/config/TuohengMqttProperties.java new file mode 100644 index 0000000..a0bfe8e --- /dev/null +++ b/src/main/java/com/ruoyi/device/service/config/TuohengMqttProperties.java @@ -0,0 +1,66 @@ +package com.ruoyi.device.service.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * DJI MQTT配置属性 + * + * @author ruoyi + */ +@Data +@Component +@ConfigurationProperties(prefix = "tuoheng.mqtt") +public class TuohengMqttProperties { + + /** + * MQTT服务器地址 + */ + private String host = "mqtt.t-aaron.com"; + + /** + * MQTT服务器端口 + */ + private Integer port = 10883; + + /** + * MQTT协议版本 + */ + private Integer version = 5; + + /** + * 客户端ID + */ + private String clientId = "ThingsBoard_gateway"; + + /** + * 用户名 + */ + private String username = "admin"; + + /** + * 密码 + */ + private String password = "admin"; + + /** + * 连接超时时间(秒) + */ + private Integer connectionTimeout = 30; + + /** + * 保持连接时间(秒) + */ + private Integer keepAliveInterval = 60; + + /** + * 自动重连 + */ + private Boolean autoReconnect = true; + + /** + * 清除会话 + */ + private Boolean cleanSession = false; +} diff --git a/src/main/java/com/ruoyi/device/service/impl/TuohengService.java b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java new file mode 100644 index 0000000..c1a7d83 --- /dev/null +++ b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java @@ -0,0 +1,202 @@ +package com.ruoyi.device.service.impl; + +import com.ruoyi.device.domain.api.IDockAircraftDomain; +import com.ruoyi.device.domain.api.IDockDomain; +import com.ruoyi.device.domain.api.IAircraftDomain; +import com.ruoyi.device.domain.api.IDeviceDomain; +import com.ruoyi.device.domain.model.DockAircraft; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportOsdCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IAirportStateCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IEventsCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneOsdCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRequestsCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IServicesReplyCallback; +import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig; +import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler; +import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportStateData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.DroneOsdData; +import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; +import com.ruoyi.device.domain.model.Aircraft; +import com.ruoyi.device.domain.model.Device; +import com.ruoyi.device.domain.model.Dock; +import com.ruoyi.device.service.config.TuohengMqttProperties; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +@Slf4j +public class TuohengService { + + @Autowired + private TuohengMqttClientManager clientManager; + + @Autowired + private TuohengMqttProperties mqttProperties; + + @Autowired + private IDockAircraftDomain dockAircraftDomain; + + @Autowired + private IDockDomain dockDomain; + + @Autowired + private IAircraftDomain aircraftDomain; + + @Autowired + private IDeviceDomain deviceDomain; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationReady() { + TuohengMqttClientConfig config = TuohengMqttClientConfig.builder() + .host(mqttProperties.getHost()) + .port(mqttProperties.getPort()) + .clientId(mqttProperties.getClientId()) + .username(mqttProperties.getUsername()) + .password(mqttProperties.getPassword()) + .connectionTimeout(mqttProperties.getConnectionTimeout()) + .keepAliveInterval(mqttProperties.getKeepAliveInterval()) + .autoReconnect(mqttProperties.getAutoReconnect()) + .cleanSession(mqttProperties.getCleanSession()) + .useSharedSubscription(true) + .sharedGroupName("tuoheng-group") + .build(); + + clientManager.initClient(config); + + TuohengMqttMessageHandler handler = clientManager.getHandler(); + + Map mapping = loadAirportDroneMapping(); + handler.setAirportDroneMapping(mapping); + + handler.registerAirportOsdCallback(new IAirportOsdCallback() { + @Override + public void onAirportOsdData(String airportSn, AirportOsdData data) { + log.info("========== 收到机场OSD数据 =========="); + log.info("机场SN: {}", airportSn); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + handler.registerDroneOsdCallback(new IDroneOsdCallback() { + @Override + public void onDroneOsdData(String droneSn, String airportSn, DroneOsdData data) { + log.info("========== 收到无人机OSD数据 =========="); + log.info("无人机SN: {}, 机场SN: {}", droneSn, airportSn); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + handler.registerAirportStateCallback(new IAirportStateCallback() { + @Override + public void onAirportStateData(String airportSn, String droneSn, AirportStateData data) { + log.info("========== 收到机场State数据 =========="); + log.info("机场SN: {}, 无人机SN: {}", airportSn, droneSn); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + handler.registerEventsCallback(new IEventsCallback() { + @Override + public void onEventsData(String airportSn, EventsData data) { + log.info("========== 收到Events数据 =========="); + log.info("机场SN: {}", airportSn); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + handler.registerServicesReplyCallback(new IServicesReplyCallback() { + @Override + public void onServicesReplyData(String airportSn, Map data) { + log.info("========== 收到ServicesReply数据 =========="); + log.info("机场SN: {}", airportSn); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + handler.registerRequestsCallback(new IRequestsCallback() { + @Override + public void onRequestsData(String airportSn, String method, Map data) { + log.info("========== 收到Requests数据 =========="); + log.info("机场SN: {}, Method: {}", airportSn, method); + try { + log.info("数据内容: {}", objectMapper.writeValueAsString(data)); + } catch (Exception e) { + log.error("序列化数据失败", e); + } + log.info("====================================="); + } + }); + + log.info("TuohengService 初始化完成,已注册所有回调"); + } + + private Map loadAirportDroneMapping() { + Map mapping = new HashMap<>(); + + try { + List dockAircraftList = dockAircraftDomain.selectDockAircraftList(new DockAircraft()); + + for (DockAircraft dockAircraft : dockAircraftList) { + Dock dock = dockDomain.selectDockByDockId(dockAircraft.getDockId()); + Aircraft aircraft = aircraftDomain.selectAircraftByAircraftId(dockAircraft.getAircraftId()); + + if (dock != null && aircraft != null) { + Device dockDevice = deviceDomain.selectDeviceByDeviceId(dock.getDeviceId()); + Device aircraftDevice = deviceDomain.selectDeviceByDeviceId(aircraft.getDeviceId()); + + if (dockDevice != null && aircraftDevice != null) { + String airportSn = dockDevice.getDeviceSn(); + String droneSn = aircraftDevice.getDeviceSn(); + if (airportSn != null && droneSn != null) { + mapping.put(airportSn, droneSn); + log.info("加载机场-无人机映射: {} -> {}", airportSn, droneSn); + } + } + } + } + + log.info("从数据库加载机场-无人机映射完成,共 {} 条记录", mapping.size()); + } catch (Exception e) { + log.error("从数据库加载机场-无人机映射失败", e); + } + + return mapping; + } +} diff --git a/src/main/resources/bootstrap.yml b/src/main/resources/bootstrap.yml index ef68131..70e0b2e 100644 --- a/src/main/resources/bootstrap.yml +++ b/src/main/resources/bootstrap.yml @@ -44,6 +44,18 @@ machine: # DJI MQTT配置 dji: + mqtt: + host: mqtt.t-aaron.com + port: 10883 + version: 5 + client-id: mqttx_c1c67436 + username: admin + password: admin + connection-timeout: 30 + keep-alive-interval: 60 + auto-reconnect: true + clean-session: false +tuoheng: mqtt: host: mqtt.t-aaron.com port: 10883