From c218d2ae811c3fd338de1a389492753436506485 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Thu, 18 Dec 2025 14:49:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0redis=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E7=9A=84=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../machine/mqtt/MqttCallbackRegistry.java | 204 ++++++++++++---- .../mqtt/store/InMemoryMqttCallbackStore.java | 85 +++++++ .../machine/mqtt/store/MqttCallbackInfo.java | 52 +++++ .../machine/mqtt/store/MqttCallbackStore.java | 79 +++++++ .../mqtt/store/RedisMqttCallbackStore.java | 218 ++++++++++++++++++ 5 files changed, 593 insertions(+), 45 deletions(-) create mode 100644 src/main/java/com/tuoheng/machine/mqtt/store/InMemoryMqttCallbackStore.java create mode 100644 src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackInfo.java create mode 100644 src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackStore.java create mode 100644 src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java diff --git a/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java b/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java index 6864157..0cc3b2d 100644 --- a/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java +++ b/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java @@ -1,31 +1,81 @@ package com.tuoheng.machine.mqtt; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.tuoheng.machine.mqtt.store.MqttCallbackInfo; +import com.tuoheng.machine.mqtt.store.MqttCallbackStore; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import java.net.InetAddress; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; /** * MQTT回调注册中心 * 用于注册和管理MQTT消息的回调处理器,他的 handleMessage 需要被真实的MQTT回调去调用 + * + * 架构说明: + * - 回调元数据存储在 MqttCallbackStore 中(支持内存/Redis) + * - Consumer 回调函数存储在本地内存中(无法序列化) + * - 多节点部署时,通过 Redis Pub/Sub 在节点间传递消息 */ @Slf4j @Component public class MqttCallbackRegistry { /** - * 主题 -> 回调处理器列表 + * 回调存储层(支持内存、Redis等多种实现) */ - private final Map> topicHandlers = new ConcurrentHashMap<>(); + private final MqttCallbackStore callbackStore; /** - * 回调ID -> 回调处理器 + * 回调ID -> 本地消息处理器(Consumer 无法序列化,只能存储在本地) */ - private final Map handlerMap = new ConcurrentHashMap<>(); + private final Map> localHandlers = new ConcurrentHashMap<>(); + + /** + * 当前节点ID(用于 Redis Pub/Sub 路由) + */ + private String nodeId; + + /** + * ObjectMapper 用于序列化消息 + */ + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Value("${machine.node.id:#{null}}") + private String configuredNodeId; + + public MqttCallbackRegistry(MqttCallbackStore callbackStore) { + this.callbackStore = callbackStore; + } + + @PostConstruct + public void init() { + // 初始化节点ID + if (configuredNodeId != null && !configuredNodeId.isEmpty()) { + nodeId = configuredNodeId; + } else { + // 自动生成节点ID:主机名 + UUID + try { + String hostname = InetAddress.getLocalHost().getHostName(); + nodeId = hostname + "-" + UUID.randomUUID().toString().substring(0, 8); + } catch (Exception e) { + nodeId = "node-" + UUID.randomUUID().toString().substring(0, 8); + } + } + + // 订阅当前节点的消息(用于 Redis Pub/Sub) + callbackStore.subscribeNodeMessages(nodeId, this::handleNodeMessage); + + log.info("MQTT回调注册中心初始化完成,节点ID: {}, 存储实现: {}", + nodeId, callbackStore.getClass().getSimpleName()); + } /** * 注册回调 @@ -37,18 +87,23 @@ public class MqttCallbackRegistry { */ public String registerCallback(String topic, Consumer messageHandler, long timeoutMs) { String callbackId = UUID.randomUUID().toString(); - MqttCallbackHandler handler = new MqttCallbackHandler( - callbackId, - topic, - messageHandler, - timeoutMs, - System.currentTimeMillis() - ); - topicHandlers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(handler); - handlerMap.put(callbackId, handler); + // 1. 创建回调信息并存储到存储层 + MqttCallbackInfo callbackInfo = MqttCallbackInfo.builder() + .callbackId(callbackId) + .topic(topic) + .timeoutMs(timeoutMs) + .registerTime(System.currentTimeMillis()) + .nodeId(nodeId) + .build(); - log.debug("注册MQTT回调: callbackId={}, topic={}, timeoutMs={}", callbackId, topic, timeoutMs); + callbackStore.registerCallback(callbackInfo); + + // 2. 将 Consumer 存储到本地内存 + localHandlers.put(callbackId, messageHandler); + + log.debug("注册MQTT回调: callbackId={}, topic={}, timeoutMs={}, nodeId={}", + callbackId, topic, timeoutMs, nodeId); return callbackId; } @@ -58,68 +113,127 @@ public class MqttCallbackRegistry { * @param callbackId 回调ID */ public void unregisterCallback(String callbackId) { - MqttCallbackHandler handler = handlerMap.remove(callbackId); - if (handler != null) { - CopyOnWriteArrayList handlers = topicHandlers.get(handler.getTopic()); - if (handlers != null) { - handlers.remove(handler); - if (handlers.isEmpty()) { - topicHandlers.remove(handler.getTopic()); - } - } - log.debug("取消注册MQTT回调: callbackId={}, topic={}", callbackId, handler.getTopic()); - } + // 1. 从存储层删除回调信息 + callbackStore.unregisterCallback(callbackId); + + // 2. 从本地内存删除 Consumer + localHandlers.remove(callbackId); + + log.debug("取消注册MQTT回调: callbackId={}", callbackId); } /** - * 处理接收到的MQTT消息 + * 处理接收到的MQTT消息(由真实的 MQTT 客户端调用) * * @param topic 主题 * @param messageBody 消息体 */ public void handleMessage(String topic, Object messageBody) { - CopyOnWriteArrayList handlers = topicHandlers.get(topic); - if (handlers == null || handlers.isEmpty()) { + // 1. 从存储层获取所有等待该 topic 的回调信息 + List callbacks = callbackStore.getCallbacksByTopic(topic); + if (callbacks.isEmpty()) { return; } - log.debug("处理MQTT消息: topic={}, handlerCount={}", topic, handlers.size()); + log.debug("处理MQTT消息: topic={}, callbackCount={}", topic, callbacks.size()); - for (MqttCallbackHandler handler : handlers) { + // 2. 序列化消息体(用于跨节点传递) + String messageBodyJson; + try { + messageBodyJson = objectMapper.writeValueAsString(messageBody); + } catch (Exception e) { + log.error("序列化消息体失败: topic={}", topic, e); + return; + } + + // 3. 处理每个回调 + for (MqttCallbackInfo callbackInfo : callbacks) { try { // 检查是否超时 - if (handler.isTimeout()) { - log.warn("MQTT回调已超时: callbackId={}, topic={}", handler.getCallbackId(), topic); - unregisterCallback(handler.getCallbackId()); + if (callbackInfo.isTimeout()) { + log.warn("MQTT回调已超时: callbackId={}, topic={}", + callbackInfo.getCallbackId(), topic); + unregisterCallback(callbackInfo.getCallbackId()); continue; } - // 执行回调 - handler.getMessageHandler().accept(messageBody); + // 判断回调是在本节点还是其他节点 + if (nodeId.equals(callbackInfo.getNodeId())) { + // 本节点的回调,直接执行 + executeLocalCallback(callbackInfo.getCallbackId(), messageBody); + } else { + // 其他节点的回调,通过 Redis Pub/Sub 转发 + callbackStore.publishMessageToNode( + callbackInfo.getNodeId(), + callbackInfo.getCallbackId(), + messageBodyJson + ); + log.debug("转发消息到节点: nodeId={}, callbackId={}", + callbackInfo.getNodeId(), callbackInfo.getCallbackId()); + } } catch (Exception e) { - log.error("执行MQTT回调失败: callbackId={}, topic={}", handler.getCallbackId(), topic, e); + log.error("处理MQTT回调失败: callbackId={}, topic={}", + callbackInfo.getCallbackId(), topic, e); } } } + /** + * 执行本地回调 + * + * @param callbackId 回调ID + * @param messageBody 消息体 + */ + private void executeLocalCallback(String callbackId, Object messageBody) { + Consumer handler = localHandlers.get(callbackId); + if (handler != null) { + try { + handler.accept(messageBody); + log.debug("执行本地回调成功: callbackId={}", callbackId); + } catch (Exception e) { + log.error("执行本地回调失败: callbackId={}", callbackId, e); + } + } else { + log.warn("本地回调处理器不存在: callbackId={}", callbackId); + } + } + + /** + * 处理从 Redis Pub/Sub 接收到的节点消息 + * + * @param callbackId 回调ID + * @param messageBodyJson 消息体(JSON 字符串) + */ + private void handleNodeMessage(String callbackId, String messageBodyJson) { + try { + // 反序列化消息体 + Object messageBody = objectMapper.readValue(messageBodyJson, Object.class); + + // 执行本地回调 + executeLocalCallback(callbackId, messageBody); + } catch (Exception e) { + log.error("处理节点消息失败: callbackId={}", callbackId, e); + } + } + /** * 清理超时的回调 */ public void cleanupTimeoutCallbacks() { - handlerMap.values().removeIf(handler -> { - if (handler.isTimeout()) { - log.warn("清理超时的MQTT回调: callbackId={}, topic={}", handler.getCallbackId(), handler.getTopic()); - unregisterCallback(handler.getCallbackId()); - return true; + List allCallbacks = callbackStore.getAllCallbacks(); + for (MqttCallbackInfo callbackInfo : allCallbacks) { + if (callbackInfo.isTimeout()) { + log.warn("清理超时的MQTT回调: callbackId={}, topic={}", + callbackInfo.getCallbackId(), callbackInfo.getTopic()); + unregisterCallback(callbackInfo.getCallbackId()); } - return false; - }); + } } /** * 获取当前注册的回调数量 */ public int getCallbackCount() { - return handlerMap.size(); + return localHandlers.size(); } } diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/InMemoryMqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/InMemoryMqttCallbackStore.java new file mode 100644 index 0000000..3062f8b --- /dev/null +++ b/src/main/java/com/tuoheng/machine/mqtt/store/InMemoryMqttCallbackStore.java @@ -0,0 +1,85 @@ +package com.tuoheng.machine.mqtt.store; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +/** + * 基于内存的 MQTT 回调存储实现 + * 适用于单节点部署或开发测试环境 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "memory", matchIfMissing = true) +public class InMemoryMqttCallbackStore implements MqttCallbackStore { + + /** + * 主题 -> 回调信息列表 + */ + private final Map> topicCallbacks = new ConcurrentHashMap<>(); + + /** + * 回调ID -> 回调信息 + */ + private final Map callbackMap = new ConcurrentHashMap<>(); + + @Override + public void registerCallback(MqttCallbackInfo callbackInfo) { + topicCallbacks.computeIfAbsent(callbackInfo.getTopic(), k -> new CopyOnWriteArrayList<>()) + .add(callbackInfo); + callbackMap.put(callbackInfo.getCallbackId(), callbackInfo); + log.debug("注册MQTT回调到内存: callbackId={}, topic={}", + callbackInfo.getCallbackId(), callbackInfo.getTopic()); + } + + @Override + public void unregisterCallback(String callbackId) { + MqttCallbackInfo callbackInfo = callbackMap.remove(callbackId); + if (callbackInfo != null) { + CopyOnWriteArrayList callbacks = topicCallbacks.get(callbackInfo.getTopic()); + if (callbacks != null) { + callbacks.remove(callbackInfo); + if (callbacks.isEmpty()) { + topicCallbacks.remove(callbackInfo.getTopic()); + } + } + log.debug("从内存中取消注册MQTT回调: callbackId={}, topic={}", + callbackId, callbackInfo.getTopic()); + } + } + + @Override + public List getCallbacksByTopic(String topic) { + CopyOnWriteArrayList callbacks = topicCallbacks.get(topic); + return callbacks != null ? new ArrayList<>(callbacks) : new ArrayList<>(); + } + + @Override + public MqttCallbackInfo getCallbackById(String callbackId) { + return callbackMap.get(callbackId); + } + + @Override + public List getAllCallbacks() { + return new ArrayList<>(callbackMap.values()); + } + + @Override + public void publishMessageToNode(String nodeId, String callbackId, String messageBody) { + // 内存实现中,不需要跨节点通信,此方法为空操作 + log.trace("内存实现不需要发布消息到节点: nodeId={}, callbackId={}", nodeId, callbackId); + } + + @Override + public void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener) { + // 内存实现中,不需要订阅节点消息,此方法为空操作 + log.trace("内存实现不需要订阅节点消息: nodeId={}", nodeId); + } +} \ No newline at end of file diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackInfo.java b/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackInfo.java new file mode 100644 index 0000000..e358e05 --- /dev/null +++ b/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackInfo.java @@ -0,0 +1,52 @@ +package com.tuoheng.machine.mqtt.store; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * MQTT 回调信息(可序列化到 Redis) + * 不包含 Consumer,只包含回调的元数据 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MqttCallbackInfo implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 回调ID(用于取消注册) + */ + private String callbackId; + + /** + * 监听的主题 + */ + private String topic; + + /** + * 超时时间(毫秒) + */ + private long timeoutMs; + + /** + * 注册时间 + */ + private long registerTime; + + /** + * 注册该回调的节点ID(用于 Redis Pub/Sub 路由) + */ + private String nodeId; + + /** + * 是否已超时 + */ + public boolean isTimeout() { + return System.currentTimeMillis() - registerTime > timeoutMs; + } +} \ No newline at end of file diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackStore.java new file mode 100644 index 0000000..7bc303d --- /dev/null +++ b/src/main/java/com/tuoheng/machine/mqtt/store/MqttCallbackStore.java @@ -0,0 +1,79 @@ +package com.tuoheng.machine.mqtt.store; + +import java.util.List; + +/** + * MQTT 回调存储接口 + * 提供回调信息的存储和获取抽象,支持多种实现(内存、Redis等) + */ +public interface MqttCallbackStore { + + /** + * 注册回调信息 + * + * @param callbackInfo 回调信息 + */ + void registerCallback(MqttCallbackInfo callbackInfo); + + /** + * 取消注册回调 + * + * @param callbackId 回调ID + */ + void unregisterCallback(String callbackId); + + /** + * 根据 topic 获取所有等待该 topic 的回调信息 + * + * @param topic MQTT 主题 + * @return 回调信息列表 + */ + List getCallbacksByTopic(String topic); + + /** + * 根据 callbackId 获取回调信息 + * + * @param callbackId 回调ID + * @return 回调信息,如果不存在返回 null + */ + MqttCallbackInfo getCallbackById(String callbackId); + + /** + * 获取所有回调信息(用于清理超时回调) + * + * @return 所有回调信息列表 + */ + List getAllCallbacks(); + + /** + * 发布消息到指定节点(用于 Redis Pub/Sub) + * 在内存实现中,此方法为空操作 + * + * @param nodeId 节点ID + * @param callbackId 回调ID + * @param messageBody 消息体(JSON 字符串) + */ + void publishMessageToNode(String nodeId, String callbackId, String messageBody); + + /** + * 订阅当前节点的消息(用于 Redis Pub/Sub) + * 在内存实现中,此方法为空操作 + * + * @param nodeId 当前节点ID + * @param messageListener 消息监听器 + */ + void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener); + + /** + * 节点消息监听器 + */ + interface NodeMessageListener { + /** + * 处理接收到的消息 + * + * @param callbackId 回调ID + * @param messageBody 消息体(JSON 字符串) + */ + void onMessage(String callbackId, String messageBody); + } +} \ No newline at end of file diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java new file mode 100644 index 0000000..39f5f7e --- /dev/null +++ b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java @@ -0,0 +1,218 @@ +package com.tuoheng.machine.mqtt.store; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +/** + * 基于 Redis 的 MQTT 回调存储实现 + * 适用于多节点部署的生产环境 + * + * 架构说明: + * 1. 回调信息存储在 Redis Hash 中:mqtt:callback:{callbackId} -> MqttCallbackInfo (JSON) + * 2. Topic 索引存储在 Redis Set 中:mqtt:topic:{topic} -> Set + * 3. 使用 Redis Pub/Sub 在节点间传递 MQTT 消息:mqtt:node:{nodeId} -> {callbackId, messageBody} + * + * 工作流程: + * - 节点A 注册回调 -> 存储到 Redis + * - 节点B 收到 MQTT 消息 -> 从 Redis 查询等待该 topic 的回调 -> 通过 Pub/Sub 发送到对应节点 + * - 节点A 收到 Pub/Sub 消息 -> 执行本地的 Consumer 回调 + * + * 使用方式: + * 1. 在 application.properties 中配置:machine.state.store.type=redis + * 2. 配置 Redis 连接信息 + * 3. 实现 Redis 相关的序列化和 Pub/Sub 逻辑 + * + * 注意:当前为空实现,需要在生产环境部署时完善 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis") +public class RedisMqttCallbackStore implements MqttCallbackStore { + + // TODO: 注入 RedisTemplate 和 StringRedisTemplate + // private final RedisTemplate redisTemplate; + // private final StringRedisTemplate stringRedisTemplate; + + // TODO: 注入 ObjectMapper 用于序列化 + // private final ObjectMapper objectMapper; + + // Redis key 前缀 + private static final String CALLBACK_KEY_PREFIX = "mqtt:callback:"; + private static final String TOPIC_INDEX_PREFIX = "mqtt:topic:"; + private static final String NODE_CHANNEL_PREFIX = "mqtt:node:"; + + // TODO: 配置回调信息的过期时间(可选) + // private static final long EXPIRE_SECONDS = 3600; // 1小时 + + public RedisMqttCallbackStore() { + log.warn("使用 Redis MQTT 回调存储实现,但当前为空实现,请在生产环境部署前完善"); + } + + @Override + public void registerCallback(MqttCallbackInfo callbackInfo) { + // TODO: 实现注册回调到 Redis + // 1. 将 MqttCallbackInfo 序列化为 JSON + // String json = objectMapper.writeValueAsString(callbackInfo); + // + // 2. 存储到 Redis Hash + // String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId(); + // stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS); + // + // 3. 添加到 Topic 索引 + // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); + // stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId()); + // stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS); + // + // log.debug("注册MQTT回调到Redis: callbackId={}, topic={}", + // callbackInfo.getCallbackId(), callbackInfo.getTopic()); + + log.warn("Redis MQTT 回调存储未实现,跳过注册: callbackId={}", callbackInfo.getCallbackId()); + } + + @Override + public void unregisterCallback(String callbackId) { + // TODO: 实现从 Redis 删除回调 + // 1. 获取回调信息 + // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + // if (callbackInfo == null) { + // return; + // } + // + // 2. 从 Topic 索引中移除 + // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); + // stringRedisTemplate.opsForSet().remove(topicKey, callbackId); + // + // 3. 删除回调信息 + // String callbackKey = CALLBACK_KEY_PREFIX + callbackId; + // stringRedisTemplate.delete(callbackKey); + // + // log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}", + // callbackId, callbackInfo.getTopic()); + + log.warn("Redis MQTT 回调存储未实现,跳过取消注册: callbackId={}", callbackId); + } + + @Override + public List getCallbacksByTopic(String topic) { + // TODO: 实现从 Redis 获取指定 topic 的所有回调 + // 1. 从 Topic 索引获取所有 callbackId + // String topicKey = TOPIC_INDEX_PREFIX + topic; + // Set callbackIds = stringRedisTemplate.opsForSet().members(topicKey); + // if (callbackIds == null || callbackIds.isEmpty()) { + // return new ArrayList<>(); + // } + // + // 2. 批量获取回调信息 + // List callbacks = new ArrayList<>(); + // for (String callbackId : callbackIds) { + // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + // if (callbackInfo != null) { + // callbacks.add(callbackInfo); + // } + // } + // return callbacks; + + log.warn("Redis MQTT 回调存储未实现,返回空列表: topic={}", topic); + return new ArrayList<>(); + } + + @Override + public MqttCallbackInfo getCallbackById(String callbackId) { + // TODO: 实现从 Redis 获取回调信息 + // String callbackKey = CALLBACK_KEY_PREFIX + callbackId; + // String json = stringRedisTemplate.opsForValue().get(callbackKey); + // if (json == null) { + // return null; + // } + // + // try { + // return objectMapper.readValue(json, MqttCallbackInfo.class); + // } catch (JsonProcessingException e) { + // log.error("反序列化回调信息失败: callbackId={}", callbackId, e); + // return null; + // } + + log.warn("Redis MQTT 回调存储未实现,返回 null: callbackId={}", callbackId); + return null; + } + + @Override + public List getAllCallbacks() { + // TODO: 实现获取所有回调信息(用于清理超时回调) + // 1. 扫描所有 mqtt:callback:* 的 key + // Set keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*"); + // if (keys == null || keys.isEmpty()) { + // return new ArrayList<>(); + // } + // + // 2. 批量获取回调信息 + // List callbacks = new ArrayList<>(); + // for (String key : keys) { + // String callbackId = key.substring(CALLBACK_KEY_PREFIX.length()); + // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + // if (callbackInfo != null) { + // callbacks.add(callbackInfo); + // } + // } + // return callbacks; + + log.warn("Redis MQTT 回调存储未实现,返回空列表"); + return new ArrayList<>(); + } + + @Override + public void publishMessageToNode(String nodeId, String callbackId, String messageBody) { + // TODO: 实现通过 Redis Pub/Sub 发布消息到指定节点 + // 1. 构造消息体 + // Map message = new HashMap<>(); + // message.put("callbackId", callbackId); + // message.put("messageBody", messageBody); + // + // 2. 序列化消息 + // String json = objectMapper.writeValueAsString(message); + // + // 3. 发布到节点频道 + // String channel = NODE_CHANNEL_PREFIX + nodeId; + // stringRedisTemplate.convertAndSend(channel, json); + // + // log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}", + // nodeId, callbackId, channel); + + log.warn("Redis MQTT 回调存储未实现,跳过发布消息: nodeId={}, callbackId={}", nodeId, callbackId); + } + + @Override + public void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener) { + // TODO: 实现订阅当前节点的 Redis Pub/Sub 消息 + // 1. 创建消息监听器 + // MessageListener redisMessageListener = (message, pattern) -> { + // try { + // String json = new String(message.getBody()); + // Map data = objectMapper.readValue(json, + // new TypeReference>() {}); + // + // String callbackId = data.get("callbackId"); + // String messageBody = data.get("messageBody"); + // + // messageListener.onMessage(callbackId, messageBody); + // } catch (Exception e) { + // log.error("处理Redis Pub/Sub消息失败", e); + // } + // }; + // + // 2. 订阅节点频道 + // String channel = NODE_CHANNEL_PREFIX + nodeId; + // redisTemplate.getConnectionFactory().getConnection() + // .subscribe(redisMessageListener, channel.getBytes()); + // + // log.info("订阅节点消息: nodeId={}, channel={}", nodeId, channel); + + log.warn("Redis MQTT 回调存储未实现,跳过订阅节点消息: nodeId={}", nodeId); + } +} \ No newline at end of file