diff --git a/src/main/java/com/ruoyi/device/domain/impl/machine/mqtt/MqttClient.java b/src/main/java/com/ruoyi/device/domain/impl/machine/mqtt/MqttClient.java index 4a790c3..af24265 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/machine/mqtt/MqttClient.java +++ b/src/main/java/com/ruoyi/device/domain/impl/machine/mqtt/MqttClient.java @@ -1,14 +1,35 @@ package com.ruoyi.device.domain.impl.machine.mqtt; +import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * MQTT客户端 */ +@Slf4j @Component public class MqttClient { - public void sendMessage(String topic, String message) { + @Autowired + private TuohengMqttClientManager tuohengMqttClientManager; + public void sendMessage(String topic, String message) { + try { + log.info("发送MQTT消息: topic={}, message={}", topic, message); + + // 使用拓恒MQTT客户端发送消息 + if (tuohengMqttClientManager != null && tuohengMqttClientManager.isConnected()) { + tuohengMqttClientManager.getClient().publish(topic, message); + log.info("MQTT消息发送成功"); + } else { + log.error("MQTT客户端未连接,无法发送消息"); + throw new RuntimeException("MQTT客户端未连接"); + } + } catch (Exception e) { + log.error("发送MQTT消息失败: topic={}, message={}", topic, message, e); + throw new RuntimeException("发送MQTT消息失败: " + e.getMessage(), e); + } } } diff --git a/src/main/java/com/ruoyi/device/domain/impl/machine/vendor/tuoheng/instruction/TuohengPowerOnInstruction.java b/src/main/java/com/ruoyi/device/domain/impl/machine/vendor/tuoheng/instruction/TuohengPowerOnInstruction.java index e100abe..db39b9f 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/machine/vendor/tuoheng/instruction/TuohengPowerOnInstruction.java +++ b/src/main/java/com/ruoyi/device/domain/impl/machine/vendor/tuoheng/instruction/TuohengPowerOnInstruction.java @@ -21,7 +21,6 @@ public class TuohengPowerOnInstruction extends AbstractInstruction { public void executeRemoteCall(InstructionContext context) throws Exception { String sn = context.getSn(); log.info("发送拓恒无人机开机指令: sn={}", sn); - // 构建MQTT消息 JSONObject payload = new JSONObject(); payload.put("messageID", System.currentTimeMillis()); diff --git a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java index 24846b5..f288735 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java +++ b/src/main/java/com/ruoyi/device/domain/impl/tuohengmqtt/handler/TuohengMqttMessageHandler.java @@ -1,6 +1,7 @@ package com.ruoyi.device.domain.impl.tuohengmqtt.handler; import com.fasterxml.jackson.databind.ObjectMapper; +import com.ruoyi.device.domain.impl.machine.mqtt.MqttCallbackRegistry; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IDroneRealTimeCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.IRealTimeBasicCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback; @@ -23,6 +24,7 @@ import java.util.regex.Pattern; public class TuohengMqttMessageHandler { private final ObjectMapper objectMapper = new ObjectMapper(); + private MqttCallbackRegistry mqttCallbackRegistry; private final List realTimeDataCallbacks = new ArrayList<>(); private final List osdCallbacks = new ArrayList<>(); @@ -32,6 +34,14 @@ public class TuohengMqttMessageHandler { private static final Pattern TUOHENG_SN_PATTERN = Pattern.compile("^TH[0-9A-Z]+"); + /** + * 设置 MQTT 回调注册中心 + */ + public void setMqttCallbackRegistry(MqttCallbackRegistry mqttCallbackRegistry) { + this.mqttCallbackRegistry = mqttCallbackRegistry; + log.info("设置 MqttCallbackRegistry 成功"); + } + public void registerRealTimeDataCallback(ITuohengRealTimeDataCallback callback) { if (callback != null && !realTimeDataCallbacks.contains(callback)) { realTimeDataCallbacks.add(callback); @@ -71,6 +81,17 @@ public class TuohengMqttMessageHandler { try { log.debug("收到MQTT消息 - Topic: {}", topic); + // 通知 MqttCallbackRegistry 处理回调(用于指令回调) + if (mqttCallbackRegistry != null) { + try { + // 将 payload 解析为 JSON 对象传递给回调注册中心 + Object messageBody = objectMapper.readValue(payload, Object.class); + mqttCallbackRegistry.handleMessage(topic, messageBody); + } catch (Exception e) { + log.debug("通知回调注册中心失败: {}", e.getMessage()); + } + } + String deviceSn = extractDeviceSnFromTopic(topic); if (deviceSn == null) { diff --git a/src/main/java/com/ruoyi/device/service/impl/TuohengService.java b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java index 5113887..9a51fc4 100644 --- a/src/main/java/com/ruoyi/device/service/impl/TuohengService.java +++ b/src/main/java/com/ruoyi/device/service/impl/TuohengService.java @@ -5,6 +5,7 @@ import com.ruoyi.device.domain.api.IDockAircraftDomain; import com.ruoyi.device.domain.api.IDockDomain; import com.ruoyi.device.domain.api.IAircraftDomain; import com.ruoyi.device.domain.api.IDeviceDomain; +import com.ruoyi.device.domain.impl.machine.mqtt.MqttCallbackRegistry; import com.ruoyi.device.domain.impl.machine.state.DroneState; import com.ruoyi.device.domain.impl.machine.state.AirportState; import com.ruoyi.device.domain.impl.machine.state.MachineStates; @@ -59,6 +60,9 @@ public class TuohengService { @Autowired private MachineStateManager stateManager; + @Autowired + private MqttCallbackRegistry mqttCallbackRegistry; + private final ObjectMapper objectMapper = new ObjectMapper(); /** @@ -91,6 +95,9 @@ public class TuohengService { TuohengMqttMessageHandler handler = clientManager.getHandler(); + // 设置 MqttCallbackRegistry 到 handler(用于指令回调) + handler.setMqttCallbackRegistry(mqttCallbackRegistry); + Map mapping = loadAirportDroneMapping(); handler.registerRealTimeDataCallback(new ITuohengRealTimeDataCallback() {