diff --git a/pom.xml b/pom.xml index b59866c..f22b25b 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,13 @@ spring-integration-mqtt + + + org.eclipse.paho + org.eclipse.paho.mqttv5.client + 1.2.5 + + diff --git a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java index ad3ab9a..c158eae 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java +++ b/src/main/java/com/ruoyi/device/domain/impl/djimqtt/service/DjiMqttClientService.java @@ -3,17 +3,19 @@ package com.ruoyi.device.domain.impl.djimqtt.service; import com.ruoyi.device.domain.impl.djimqtt.config.DjiMqttClientConfig; import com.ruoyi.device.domain.impl.djimqtt.handler.DjiMqttMessageHandler; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.messaging.Message; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttCallback; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; /** - * DJI MQTT客户端服务(基于Spring Integration MQTT) - * 支持动态创建多个客户端,自动重连和重新订阅 + * DJI MQTT客户端服务(基于Eclipse Paho MQTT v5) + * 支持MQTT 5.0协议,动态创建多个客户端 * * @author ruoyi */ @@ -22,7 +24,7 @@ public class DjiMqttClientService { private final DjiMqttClientConfig config; private final DjiMqttMessageHandler messageHandler; - private MqttPahoMessageDrivenChannelAdapter adapter; + private MqttClient mqttClient; /** * 无人机OSD主题 @@ -50,7 +52,7 @@ public class DjiMqttClientService { */ public void connect() { try { - if (adapter != null && adapter.isRunning()) { + if (mqttClient != null && mqttClient.isConnected()) { log.info("MQTT客户端[{}]已连接,无需重复连接", config.getClientId()); return; } @@ -58,91 +60,102 @@ public class DjiMqttClientService { String broker = String.format("tcp://%s:%d", config.getHost(), config.getPort()); log.info("开始连接DJI MQTT服务器[{}]: {}", config.getClientId(), broker); - // 创建MQTT客户端工厂 - MqttPahoClientFactory clientFactory = createClientFactory(broker); + mqttClient = new MqttClient(broker, config.getClientId(), new MemoryPersistence()); - // 构建订阅主题 - String[] topics = buildTopics(); + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setUserName(config.getUsername()); + options.setPassword(config.getPassword().getBytes()); + options.setConnectionTimeout(config.getConnectionTimeout()); + options.setKeepAliveInterval(config.getKeepAliveInterval()); + options.setAutomaticReconnect(config.getAutoReconnect()); + options.setCleanStart(config.getCleanSession()); - // 创建消息驱动适配器 - adapter = new MqttPahoMessageDrivenChannelAdapter( - config.getClientId(), - clientFactory, - topics - ); + mqttClient.setCallback(new MqttCallback() { + @Override + public void disconnected(MqttDisconnectResponse disconnectResponse) { + log.error("MQTT客户端[{}]连接丢失: {}", config.getClientId(), + disconnectResponse.getReasonString()); - // 设置消息转换器 - adapter.setConverter(new DefaultPahoMessageConverter()); + if (config.getAutoReconnect()) { + log.info("MQTT客户端[{}]将自动重连...", config.getClientId()); + } + } - // 设置QoS - adapter.setQos(1); + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("MQTT客户端[{}]发生错误: {}", config.getClientId(), + exception.getMessage(), exception); + } - // 创建消息通道并设置消息处理器 - DirectChannel channel = new DirectChannel(); - channel.subscribe(this::handleMessage); - adapter.setOutputChannel(channel); + @Override + public void messageArrived(String topic, MqttMessage message) { + try { + String payload = new String(message.getPayload()); + messageHandler.handleMessage(topic, payload); + } catch (Exception e) { + log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), + e.getMessage(), e); + } + } - // 启动适配器 - adapter.start(); + @Override + public void deliveryComplete(IMqttToken token) { + // 不需要处理 + } - log.info("MQTT客户端[{}]成功连接到服务器并订阅主题", config.getClientId()); + @Override + public void connectComplete(boolean reconnect, String serverURI) { + if (reconnect) { + log.info("MQTT客户端[{}]重连成功: {}", config.getClientId(), serverURI); + // 重连后重新订阅 + subscribe(); + } else { + log.info("MQTT客户端[{}]首次连接成功: {}", config.getClientId(), serverURI); + } + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + // 不需要处理 + } + }); + + mqttClient.connect(options); + log.info("MQTT客户端[{}]成功连接到服务器", config.getClientId()); + + subscribe(); } catch (Exception e) { log.error("MQTT客户端[{}]连接失败: {}", config.getClientId(), e.getMessage(), e); - throw new RuntimeException("MQTT连接失败", e); } } /** - * 创建MQTT客户端工厂 + * 订阅主题 */ - private MqttPahoClientFactory createClientFactory(String broker) { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - - MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(new String[]{broker}); - options.setUserName(config.getUsername()); - options.setPassword(config.getPassword().toCharArray()); - options.setConnectionTimeout(config.getConnectionTimeout()); - options.setKeepAliveInterval(config.getKeepAliveInterval()); - options.setAutomaticReconnect(config.getAutoReconnect()); - options.setCleanSession(config.getCleanSession()); - - // 设置MQTT版本为5.0 - options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_5_0); - - factory.setConnectionOptions(options); - return factory; - } - - /** - * 构建订阅主题 - */ - private String[] buildTopics() { - String osdTopic = config.getUseSharedSubscription() - ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC) - : DRONE_OSD_TOPIC; - - String stateTopic = config.getUseSharedSubscription() - ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC) - : DRONE_STATE_TOPIC; - - return new String[]{osdTopic, stateTopic}; - } - - /** - * 处理接收到的MQTT消息 - */ - private void handleMessage(Message message) { + private void subscribe() { try { - String topic = (String) message.getHeaders().get("mqtt_receivedTopic"); - String payload = (String) message.getPayload(); - - if (topic != null) { - messageHandler.handleMessage(topic, payload); + if (mqttClient == null || !mqttClient.isConnected()) { + log.warn("MQTT客户端[{}]未连接,无法订阅主题", config.getClientId()); + return; } + + String osdTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_OSD_TOPIC) + : DRONE_OSD_TOPIC; + + String stateTopic = config.getUseSharedSubscription() + ? String.format("$share/%s/%s", config.getSharedGroupName(), DRONE_STATE_TOPIC) + : DRONE_STATE_TOPIC; + + mqttClient.subscribe(osdTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), osdTopic); + + mqttClient.subscribe(stateTopic, 1); + log.info("MQTT客户端[{}]成功订阅主题: {}", config.getClientId(), stateTopic); + } catch (Exception e) { - log.error("MQTT客户端[{}]处理消息失败: {}", config.getClientId(), e.getMessage(), e); + log.error("MQTT客户端[{}]订阅主题失败: {}", config.getClientId(), e.getMessage(), e); } } @@ -151,8 +164,9 @@ public class DjiMqttClientService { */ public void disconnect() { try { - if (adapter != null && adapter.isRunning()) { - adapter.stop(); + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + mqttClient.close(); log.info("MQTT客户端[{}]已断开连接", config.getClientId()); } } catch (Exception e) { @@ -164,7 +178,7 @@ public class DjiMqttClientService { * 检查连接状态 */ public boolean isConnected() { - return adapter != null && adapter.isRunning(); + return mqttClient != null && mqttClient.isConnected(); } /**