package com.ruoyi.device.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.device.domain.api.IDockAircraftDomain; import com.ruoyi.device.domain.api.IDockDomain; import com.ruoyi.device.domain.api.IAircraftDomain; import com.ruoyi.device.domain.api.IDeviceDomain; import com.ruoyi.device.domain.impl.machine.mqtt.MqttCallbackRegistry; import com.ruoyi.device.domain.impl.machine.state.DroneState; import com.ruoyi.device.domain.impl.machine.state.AirportState; import com.ruoyi.device.domain.impl.machine.state.MachineStates; import com.ruoyi.device.domain.impl.machine.statemachine.MachineStateManager; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengEventsCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengOsdCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.callback.ITuohengRealTimeDataCallback; import com.ruoyi.device.domain.impl.tuohengmqtt.config.TuohengMqttClientConfig; import com.ruoyi.device.domain.impl.tuohengmqtt.handler.TuohengMqttMessageHandler; import com.ruoyi.device.domain.impl.tuohengmqtt.manager.TuohengMqttClientManager; import com.ruoyi.device.domain.impl.tuohengmqtt.model.AirportOsdData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.EventsData; import com.ruoyi.device.domain.impl.tuohengmqtt.model.TuohengRealTimeData; import com.ruoyi.device.domain.model.Aircraft; import com.ruoyi.device.domain.model.Device; import com.ruoyi.device.domain.model.Dock; import com.ruoyi.device.domain.model.DockAircraft; import com.ruoyi.device.service.config.TuohengMqttProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; @Service @Slf4j public class TuohengService { @Autowired private TuohengMqttClientManager clientManager; @Autowired private TuohengMqttProperties mqttProperties; @Autowired private IDockAircraftDomain dockAircraftDomain; @Autowired private IDockDomain dockDomain; @Autowired private IAircraftDomain aircraftDomain; @Autowired private IDeviceDomain deviceDomain; @Autowired private MachineStateManager stateManager; @Autowired private MqttCallbackRegistry mqttCallbackRegistry; private final ObjectMapper objectMapper = new ObjectMapper(); /** * 机场心跳时间戳记录 (deviceSn -> lastHeartbeatTime) */ private final Map airportHeartbeatMap = new java.util.concurrent.ConcurrentHashMap<>(); /** * 心跳超时时间:5分钟 */ private static final long HEARTBEAT_TIMEOUT = 5 * 60 * 1000; @EventListener(ApplicationReadyEvent.class) public void onApplicationReady() { TuohengMqttClientConfig config = TuohengMqttClientConfig.builder() .host(mqttProperties.getHost()) .port(mqttProperties.getPort()) .clientId(mqttProperties.getClientId()) .username(mqttProperties.getUsername()) .password(mqttProperties.getPassword()) .connectionTimeout(mqttProperties.getConnectionTimeout()) .keepAliveInterval(mqttProperties.getKeepAliveInterval()) .autoReconnect(mqttProperties.getAutoReconnect()) .cleanSession(mqttProperties.getCleanSession()) .useSharedSubscription(true) .sharedGroupName("tuoheng-group") .build(); clientManager.initClient(config); TuohengMqttMessageHandler handler = clientManager.getHandler(); // 设置 MqttCallbackRegistry 到 handler(用于指令回调) handler.setMqttCallbackRegistry(mqttCallbackRegistry); Map mapping = loadAirportDroneMapping(); handler.registerRealTimeDataCallback(new ITuohengRealTimeDataCallback() { @Override public void onRealTimeData(String deviceSn, TuohengRealTimeData data) { log.info("========== 收到拓恒实时数据 =========="); log.info("设备SN: {}", deviceSn); try { log.info("数据内容: {}", objectMapper.writeValueAsString(data)); // 更新机场心跳时间戳 updateAirportHeartbeat(deviceSn); // 同步无人机开关机状态 syncDronePowerState(deviceSn, data); } catch (Exception e) { log.error("处理实时数据失败", e); } log.info("====================================="); } }); handler.registerOsdCallback(new ITuohengOsdCallback() { @Override public void onOsdData(String deviceSn, AirportOsdData data) { log.info("========== 收到拓恒OSD数据 =========="); log.info("设备SN: {}", deviceSn); try { log.info("数据内容: {}", objectMapper.writeValueAsString(data)); // 同步飞行状态到 MachineStateManager syncFlightState(deviceSn, data); } catch (Exception e) { log.error("序列化数据失败", e); } log.info("====================================="); } }); handler.registerEventsCallback(new ITuohengEventsCallback() { @Override public void onEventsData(String deviceSn, EventsData data) { log.info("========== 收到拓恒Events数据 =========="); log.info("设备SN: {}", deviceSn); try { log.info("数据内容: {}", objectMapper.writeValueAsString(data)); } catch (Exception e) { log.error("序列化数据失败", e); } log.info("====================================="); } }); log.info("TuohengService 初始化完成,已注册所有回调"); } private Map loadAirportDroneMapping() { Map mapping = new HashMap<>(); try { List dockAircraftList = dockAircraftDomain.selectDockAircraftList(new DockAircraft()); for (DockAircraft dockAircraft : dockAircraftList) { Dock dock = dockDomain.selectDockByDockId(dockAircraft.getDockId()); Aircraft aircraft = aircraftDomain.selectAircraftByAircraftId(dockAircraft.getAircraftId()); if (dock != null && aircraft != null) { Device dockDevice = deviceDomain.selectDeviceByDeviceId(dock.getDeviceId()); Device aircraftDevice = deviceDomain.selectDeviceByDeviceId(aircraft.getDeviceId()); if (dockDevice != null && aircraftDevice != null) { String airportSn = dockDevice.getDeviceSn(); String droneSn = aircraftDevice.getDeviceSn(); if (airportSn != null && droneSn != null) { if (airportSn.startsWith("THJS") || droneSn.startsWith("THJS")) { log.debug("跳过大疆设备 - 机场SN: {}, 无人机SN: {}", airportSn, droneSn); continue; } mapping.put(airportSn, droneSn); log.info("加载机场-无人机映射: {} -> {}", airportSn, droneSn); } } } } log.info("从数据库加载机场-无人机映射完成,共 {} 条记录", mapping.size()); } catch (Exception e) { log.error("从数据库加载机场-无人机映射失败", e); } return mapping; } /** * 同步飞行状态到 MachineStateManager * 根据 OSD 数据中的 flighttask_step_code 和 mode_code 判断飞行状态 */ private void syncFlightState(String deviceSn, AirportOsdData data) { try { if (data == null) { return; } String flighttaskStepCode = data.getFlighttaskStepCode(); String modeCode = data.getModeCode(); // 同步无人机状态 DroneState droneState = determineDroneState(flighttaskStepCode, modeCode); if (droneState != null) { stateManager.setDroneState(deviceSn, droneState); log.debug("同步飞行状态: sn={}, flighttaskStepCode={}, modeCode={}, state={}", deviceSn, flighttaskStepCode, modeCode, droneState); } // 注意:机场在线状态由 IOT 平台的心跳机制判断(5分钟超时) // 不在这里简单地根据收到数据就判断为在线 } catch (Exception e) { log.error("同步飞行状态失败: sn={}", deviceSn, e); } } /** * 根据任务状态码和模式码判断无人机状态 */ private DroneState determineDroneState(String flighttaskStepCode, String modeCode) { // 优先根据 flighttask_step_code 判断 if (flighttaskStepCode != null) { switch (flighttaskStepCode) { case "1": // 飞行作业中 return DroneState.FLYING; case "2": // 作业后状态恢复,可能是返航或已到达 return DroneState.RETURNING; case "5": // 任务空闲 return DroneState.ONLINE; case "255": // 飞行器异常 return DroneState.UNKNOWN; } } // 根据 mode_code 辅助判断 if (modeCode != null) { if (modeCode.equals("3") || modeCode.equals("4") || modeCode.equals("5")) { // 飞行中状态 return DroneState.FLYING; } } return null; } /** * 更新机场心跳时间戳并设置在线状态 */ private void updateAirportHeartbeat(String deviceSn) { long currentTime = System.currentTimeMillis(); airportHeartbeatMap.put(deviceSn, currentTime); // 收到心跳,设置机场为在线 stateManager.setAirportState(deviceSn, AirportState.ONLINE); log.debug("更新机场心跳: sn={}, time={}", deviceSn, currentTime); } /** * 同步无人机开关机状态 * 注意:只有关机时才更新状态,其他情况保持当前状态不变 */ private void syncDronePowerState(String deviceSn, TuohengRealTimeData data) { try { if (data == null || data.getDroneBattery() == null || data.getDroneBattery().getData() == null) { return; } Integer powerOn = data.getDroneBattery().getData().getBPowerON(); if (powerOn == null) { return; } // 只有关机时才更新状态为 POWER_OFF // 其他情况(开机、飞行中等)保持当前状态不变 if (powerOn == 2) { stateManager.setDroneState(deviceSn, DroneState.POWER_OFF); log.debug("同步无人机关机状态: sn={}, powerOn={}", deviceSn, powerOn); } } catch (Exception e) { log.error("同步无人机开关机状态失败: sn={}", deviceSn, e); } } /** * 定时检查机场心跳超时 * 每分钟执行一次 */ @Scheduled(fixedRate = 60000) public void checkAirportHeartbeatTimeout() { long currentTime = System.currentTimeMillis(); for (Map.Entry entry : airportHeartbeatMap.entrySet()) { String deviceSn = entry.getKey(); Long lastHeartbeatTime = entry.getValue(); long timeDiff = currentTime - lastHeartbeatTime; if (timeDiff > HEARTBEAT_TIMEOUT) { // 超时,设置为离线 stateManager.setAirportState(deviceSn, AirportState.OFFLINE); log.warn("机场心跳超时,设置为离线: sn={}, 超时时长={}秒", deviceSn, timeDiff / 1000); } } } }