Compare commits

..

No commits in common. "f62d2798f6ca0e0561435f5e016f6fa8485a0164" and "5d4403f55c4337c56f650ff8a9e77ee35f3d17b7" have entirely different histories.

2 changed files with 101 additions and 80 deletions

View File

@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static com.ruoyi.device.domain.impl.tuohengmqtt.service.TuohengMqttClientService.*;
@Slf4j @Slf4j
@Component @Component
public class TuohengMqttMessageHandler { public class TuohengMqttMessageHandler {
@ -41,7 +39,6 @@ public class TuohengMqttMessageHandler {
private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+"); private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+");
/** /**
* 设置 MQTT 回调注册中心 * 设置 MQTT 回调注册中心
*/ */
@ -103,18 +100,29 @@ public class TuohengMqttMessageHandler {
try { try {
log.debug("收到MQTT消息 - Topic: {}", topic); log.debug("收到MQTT消息 - Topic: {}", topic);
// 如果是 confirm 消息打印详细日志
// if (topic.contains("/control/confirm")) {
// log.info("【收到confirm消息】Topic: {}, Payload: {}", topic, payload);
// }
// 通知 MqttCallbackRegistry 处理回调用于指令回调 // 通知 MqttCallbackRegistry 处理回调用于指令回调
if (machineCallBackRegistry != null) { if (machineCallBackRegistry != null) {
try { try {
// payload 解析为 JSON 对象传递给回调注册中心 // payload 解析为 JSON 对象传递给回调注册中心
Object messageBody = objectMapper.readValue(payload, Object.class); Object messageBody = objectMapper.readValue(payload, Object.class);
machineCallBackRegistry.handleMessage(topic, messageBody); machineCallBackRegistry.handleMessage(topic, messageBody);
// // 如果是 confirm 消息打印回调处理结果
// if (topic.contains("/control/confirm")) {
// log.info("【confirm消息已传递给回调注册中心】Topic: {}", topic);
// }
} catch (Exception e) { } catch (Exception e) {
log.debug("通知回调注册中心失败: {}", e.getMessage()); log.debug("通知回调注册中心失败: {}", e.getMessage());
} }
} }
String deviceSn = extractDeviceSnFromTopic(topic); String deviceSn = extractDeviceSnFromTopic(topic);
if (deviceSn == null) { if (deviceSn == null) {
log.warn("无法从Topic解析设备SN: {}", topic); log.warn("无法从Topic解析设备SN: {}", topic);
return; return;
@ -127,29 +135,34 @@ public class TuohengMqttMessageHandler {
} }
} }
// 根据Topic模式匹配分发到不同的处理方法 String messageType = extractMessageTypeFromTopic(topic);
if (matchTopic(topic, AIRPORT_NEST_REALTIME_TOPIC)) {
handleAirportNestRealTimeData(deviceSn, payload); if (messageType == null) {
} else if (matchTopic(topic, AIRPORT_NEST_BASIC_TOPIC)) { log.warn("无法从Topic解析消息类型: {}", topic);
return;
}
switch (messageType) {
case "realTime/data":
handleRealTimeData(deviceSn, payload, topic);
break;
case "realTime/basic":
handleRealTimeBasicData(deviceSn, payload); handleRealTimeBasicData(deviceSn, payload);
} else if (matchTopic(topic, AIRPORT_NEST_CONFIRM_TOPIC)) { break;
// 机场控制确认消息已由 MqttCallbackRegistry 处理 case "heartbeat/message":
log.debug("机场控制确认消息 - SN: {}", deviceSn);
} else if (matchTopic(topic, AIRPORT_DRONE_REALTIME_TOPIC)) {
handleAirportDroneRealTimeData(deviceSn, payload);
} else if (matchTopic(topic, AIRPORT_FLY_DATA_TOPIC)) {
handleAirportFlyControlData(deviceSn, payload, topic);
} else if (matchTopic(topic, AIRPORT_FLY_CONFIRM_TOPIC)) {
// 飞行控制确认消息已由 MqttCallbackRegistry 处理
log.debug("飞行控制确认消息 - SN: {}", deviceSn);
} else if (matchTopic(topic, HEARTBEAT_MESSAGE_TOPIC)) {
handleHeartbeatMessage(deviceSn, payload); handleHeartbeatMessage(deviceSn, payload);
} else if (matchTopic(topic, PRODUCT_OSD_TOPIC)) { break;
case "osd":
handleOsdData(deviceSn, payload); handleOsdData(deviceSn, payload);
} else if (matchTopic(topic, PRODUCT_EVENTS_TOPIC)) { break;
case "events":
handleEventsData(deviceSn, payload); handleEventsData(deviceSn, payload);
} else { break;
log.debug("未知的消息类型 - Topic: {}", topic); case "control/data":
handleAirportFlyControlData(deviceSn, payload, topic);
break;
default:
log.debug("未知消息类型: {}", messageType);
} }
} catch (Exception e) { } catch (Exception e) {
@ -157,8 +170,9 @@ public class TuohengMqttMessageHandler {
} }
} }
private void handleAirportNestRealTimeData(String deviceSn, String payload) { private void handleRealTimeData(String deviceSn, String payload, String topic) {
try { try {
if (topic.contains("airportNest")) {
TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class); TuohengRealTimeData data = objectMapper.readValue(payload, TuohengRealTimeData.class);
log.debug("处理机场实时数据 - 设备SN: {}", deviceSn); log.debug("处理机场实时数据 - 设备SN: {}", deviceSn);
for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) { for (ITuohengRealTimeDataCallback callback : realTimeDataCallbacks) {
@ -168,13 +182,7 @@ public class TuohengMqttMessageHandler {
log.error("实时数据回调执行失败: {}", e.getMessage(), e); log.error("实时数据回调执行失败: {}", e.getMessage(), e);
} }
} }
} catch (Exception e) { } else if (topic.contains("airportDrone")) {
log.error("处理机场实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
}
}
private void handleAirportDroneRealTimeData(String deviceSn, String payload) {
try {
DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class); DroneRealTimeData data = objectMapper.readValue(payload, DroneRealTimeData.class);
log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn); log.debug("处理无人机实时数据 - 设备SN: {}", deviceSn);
for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) { for (IDroneRealTimeCallback callback : droneRealTimeCallbacks) {
@ -184,8 +192,9 @@ public class TuohengMqttMessageHandler {
log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e); log.error("无人机实时数据回调执行失败: {}", e.getMessage(), e);
} }
} }
}
} catch (Exception e) { } catch (Exception e) {
log.error("处理无人机实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e); log.error("处理实时数据失败 - SN: {}, Error: {}", deviceSn, e.getMessage(), e);
} }
} }
@ -259,7 +268,7 @@ public class TuohengMqttMessageHandler {
private void handleAirportFlyControlData(String deviceSn, String payload, String topic) { private void handleAirportFlyControlData(String deviceSn, String payload, String topic) {
try { try {
log.info("处理机场飞行控制数据 - 设备SN: {}, Topic: {}", deviceSn, topic); log.debug("处理机场飞行控制数据 - 设备SN: {}, Topic: {}", deviceSn, topic);
for (IAirportFlyControlDataCallback callback : airportFlyControlDataCallbacks) { for (IAirportFlyControlDataCallback callback : airportFlyControlDataCallbacks) {
try { try {
@ -296,6 +305,37 @@ public class TuohengMqttMessageHandler {
return null; return null;
} }
private String extractMessageTypeFromTopic(String topic) {
if (topic == null) {
return null;
}
String[] parts = topic.split("/");
// /topic/v1/heartbeat/{deviceSn}/message
// parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="heartbeat",
// parts[4]=deviceSn, parts[5]="message"
if (topic.startsWith("/topic/v1/heartbeat/")) {
if (parts.length >= 6) {
return "heartbeat/message";
}
}
// /topic/v1/airportNest/{deviceSn}/realTime/data
// parts[0]="", parts[1]="topic", parts[2]="v1", parts[3]="airportNest",
// parts[4]=deviceSn, parts[5]="realTime", parts[6]="data"
else if (topic.startsWith("/topic/v1/")) {
if (parts.length >= 7) {
return parts[5] + "/" + parts[6]; // "realTime/data" or "realTime/basic"
}
}
// thing/product/{deviceSn}/osd
// parts[0]="thing", parts[1]="product", parts[2]=deviceSn, parts[3]="osd"
else if (topic.startsWith("thing/product/")) {
if (parts.length >= 4) {
return parts[3]; // "osd" or "events"
}
}
return null;
}
private boolean isProductTopic(String topic) { private boolean isProductTopic(String topic) {
return topic != null && topic.startsWith("thing/product/"); return topic != null && topic.startsWith("thing/product/");
@ -307,19 +347,4 @@ public class TuohengMqttMessageHandler {
} }
return TUOHENG_SN_PATTERN.matcher(sn).matches(); return TUOHENG_SN_PATTERN.matcher(sn).matches();
} }
/**
* 匹配Topic模式
* @param topic 实际topic
* @param pattern 模式topic支持+通配符
* @return 是否匹配
*/
private boolean matchTopic(String topic, String pattern) {
if (topic == null || pattern == null) {
return false;
}
// 将模式中的+替换为正则表达式匹配除斜杠外的任意字符
String regex = pattern.replace("+", "[^/]+");
return topic.matches(regex);
}
} }

View File

@ -20,15 +20,13 @@ public class TuohengMqttClientService {
private final TuohengMqttMessageHandler messageHandler; private final TuohengMqttMessageHandler messageHandler;
private MqttClient mqttClient; private MqttClient mqttClient;
public static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data"; private static final String AIRPORT_NEST_REALTIME_TOPIC = "/topic/v1/airportNest/+/realTime/data";
public static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic"; private static final String AIRPORT_NEST_BASIC_TOPIC = "/topic/v1/airportNest/+/realTime/basic";
public static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm"; private static final String AIRPORT_NEST_CONFIRM_TOPIC = "/topic/v1/airportNest/+/control/confirm";
public static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data"; private static final String AIRPORT_DRONE_REALTIME_TOPIC = "/topic/v1/airportDrone/+/realTime/data";
public static final String AIRPORT_FLY_DATA_TOPIC = "/topic/v1/airportFly/+/control/data"; private static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message";
public static final String AIRPORT_FLY_CONFIRM_TOPIC = "/topic/v1/airportFly/+/control/confirm"; private static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd";
public static final String HEARTBEAT_MESSAGE_TOPIC = "/topic/v1/heartbeat/+/message"; private static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events";
public static final String PRODUCT_OSD_TOPIC = "thing/product/+/osd";
public static final String PRODUCT_EVENTS_TOPIC = "thing/product/+/events";
public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) { public TuohengMqttClientService(TuohengMqttClientConfig config, TuohengMqttMessageHandler messageHandler) {
this.config = config; this.config = config;
@ -126,8 +124,6 @@ public class TuohengMqttClientService {
AIRPORT_NEST_BASIC_TOPIC, AIRPORT_NEST_BASIC_TOPIC,
AIRPORT_NEST_CONFIRM_TOPIC, AIRPORT_NEST_CONFIRM_TOPIC,
AIRPORT_DRONE_REALTIME_TOPIC, AIRPORT_DRONE_REALTIME_TOPIC,
AIRPORT_FLY_DATA_TOPIC,
AIRPORT_FLY_CONFIRM_TOPIC,
HEARTBEAT_MESSAGE_TOPIC, HEARTBEAT_MESSAGE_TOPIC,
PRODUCT_OSD_TOPIC, PRODUCT_OSD_TOPIC,
PRODUCT_EVENTS_TOPIC PRODUCT_EVENTS_TOPIC