diff --git a/pom.xml b/pom.xml
index 43a19fa..d58dea7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,5 +71,17 @@
true
+
+
+ org.springframework.boot
+ spring-boot-starter-data-redis
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java
index 39f5f7e..099c68c 100644
--- a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java
+++ b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java
@@ -1,13 +1,19 @@
package com.tuoheng.machine.mqtt.store;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
/**
* 基于 Redis 的 MQTT 回调存储实现
@@ -27,192 +33,175 @@ import java.util.List;
* 1. 在 application.properties 中配置:machine.state.store.type=redis
* 2. 配置 Redis 连接信息
* 3. 实现 Redis 相关的序列化和 Pub/Sub 逻辑
- *
- * 注意:当前为空实现,需要在生产环境部署时完善
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis")
public class RedisMqttCallbackStore implements MqttCallbackStore {
- // TODO: 注入 RedisTemplate 和 StringRedisTemplate
- // private final RedisTemplate redisTemplate;
- // private final StringRedisTemplate stringRedisTemplate;
-
- // TODO: 注入 ObjectMapper 用于序列化
- // private final ObjectMapper objectMapper;
+ private final StringRedisTemplate stringRedisTemplate;
+ private final RedisMessageListenerContainer redisMessageListenerContainer;
+ private final ObjectMapper objectMapper;
// Redis key 前缀
private static final String CALLBACK_KEY_PREFIX = "mqtt:callback:";
private static final String TOPIC_INDEX_PREFIX = "mqtt:topic:";
private static final String NODE_CHANNEL_PREFIX = "mqtt:node:";
- // TODO: 配置回调信息的过期时间(可选)
- // private static final long EXPIRE_SECONDS = 3600; // 1小时
+ // 配置回调信息的过期时间
+ private static final long EXPIRE_SECONDS = 3600; // 1小时
- public RedisMqttCallbackStore() {
- log.warn("使用 Redis MQTT 回调存储实现,但当前为空实现,请在生产环境部署前完善");
+ public RedisMqttCallbackStore(StringRedisTemplate stringRedisTemplate,
+ RedisMessageListenerContainer redisMessageListenerContainer,
+ ObjectMapper objectMapper) {
+ this.stringRedisTemplate = stringRedisTemplate;
+ this.redisMessageListenerContainer = redisMessageListenerContainer;
+ this.objectMapper = objectMapper;
+ log.info("使用 Redis MQTT 回调存储实现");
}
@Override
public void registerCallback(MqttCallbackInfo callbackInfo) {
- // TODO: 实现注册回调到 Redis
- // 1. 将 MqttCallbackInfo 序列化为 JSON
- // String json = objectMapper.writeValueAsString(callbackInfo);
- //
- // 2. 存储到 Redis Hash
- // String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
- // stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS);
- //
- // 3. 添加到 Topic 索引
- // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
- // stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId());
- // stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS);
- //
- // log.debug("注册MQTT回调到Redis: callbackId={}, topic={}",
- // callbackInfo.getCallbackId(), callbackInfo.getTopic());
+ try {
+ // 1. 将 MqttCallbackInfo 序列化为 JSON
+ String json = objectMapper.writeValueAsString(callbackInfo);
- log.warn("Redis MQTT 回调存储未实现,跳过注册: callbackId={}", callbackInfo.getCallbackId());
+ // 2. 存储到 Redis Hash
+ String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
+ stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS);
+
+ // 3. 添加到 Topic 索引
+ String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
+ stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId());
+ stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS);
+
+ log.debug("注册MQTT回调到Redis: callbackId={}, topic={}",
+ callbackInfo.getCallbackId(), callbackInfo.getTopic());
+ } catch (JsonProcessingException e) {
+ log.error("序列化回调信息失败: callbackId={}", callbackInfo.getCallbackId(), e);
+ }
}
@Override
public void unregisterCallback(String callbackId) {
- // TODO: 实现从 Redis 删除回调
// 1. 获取回调信息
- // MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
- // if (callbackInfo == null) {
- // return;
- // }
- //
- // 2. 从 Topic 索引中移除
- // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
- // stringRedisTemplate.opsForSet().remove(topicKey, callbackId);
- //
- // 3. 删除回调信息
- // String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
- // stringRedisTemplate.delete(callbackKey);
- //
- // log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
- // callbackId, callbackInfo.getTopic());
+ MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
+ if (callbackInfo == null) {
+ return;
+ }
- log.warn("Redis MQTT 回调存储未实现,跳过取消注册: callbackId={}", callbackId);
+ // 2. 从 Topic 索引中移除
+ String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
+ stringRedisTemplate.opsForSet().remove(topicKey, callbackId);
+
+ // 3. 删除回调信息
+ String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
+ stringRedisTemplate.delete(callbackKey);
+
+ log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
+ callbackId, callbackInfo.getTopic());
}
@Override
public List getCallbacksByTopic(String topic) {
- // TODO: 实现从 Redis 获取指定 topic 的所有回调
// 1. 从 Topic 索引获取所有 callbackId
- // String topicKey = TOPIC_INDEX_PREFIX + topic;
- // Set callbackIds = stringRedisTemplate.opsForSet().members(topicKey);
- // if (callbackIds == null || callbackIds.isEmpty()) {
- // return new ArrayList<>();
- // }
- //
- // 2. 批量获取回调信息
- // List callbacks = new ArrayList<>();
- // for (String callbackId : callbackIds) {
- // MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
- // if (callbackInfo != null) {
- // callbacks.add(callbackInfo);
- // }
- // }
- // return callbacks;
+ String topicKey = TOPIC_INDEX_PREFIX + topic;
+ Set callbackIds = stringRedisTemplate.opsForSet().members(topicKey);
+ if (callbackIds == null || callbackIds.isEmpty()) {
+ return new ArrayList<>();
+ }
- log.warn("Redis MQTT 回调存储未实现,返回空列表: topic={}", topic);
- return new ArrayList<>();
+ // 2. 批量获取回调信息
+ List callbacks = new ArrayList<>();
+ for (String callbackId : callbackIds) {
+ MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
+ if (callbackInfo != null) {
+ callbacks.add(callbackInfo);
+ }
+ }
+ return callbacks;
}
@Override
public MqttCallbackInfo getCallbackById(String callbackId) {
- // TODO: 实现从 Redis 获取回调信息
- // String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
- // String json = stringRedisTemplate.opsForValue().get(callbackKey);
- // if (json == null) {
- // return null;
- // }
- //
- // try {
- // return objectMapper.readValue(json, MqttCallbackInfo.class);
- // } catch (JsonProcessingException e) {
- // log.error("反序列化回调信息失败: callbackId={}", callbackId, e);
- // return null;
- // }
+ String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
+ String json = stringRedisTemplate.opsForValue().get(callbackKey);
+ if (json == null) {
+ return null;
+ }
- log.warn("Redis MQTT 回调存储未实现,返回 null: callbackId={}", callbackId);
- return null;
+ try {
+ return objectMapper.readValue(json, MqttCallbackInfo.class);
+ } catch (JsonProcessingException e) {
+ log.error("反序列化回调信息失败: callbackId={}", callbackId, e);
+ return null;
+ }
}
@Override
public List getAllCallbacks() {
- // TODO: 实现获取所有回调信息(用于清理超时回调)
// 1. 扫描所有 mqtt:callback:* 的 key
- // Set keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*");
- // if (keys == null || keys.isEmpty()) {
- // return new ArrayList<>();
- // }
- //
- // 2. 批量获取回调信息
- // List callbacks = new ArrayList<>();
- // for (String key : keys) {
- // String callbackId = key.substring(CALLBACK_KEY_PREFIX.length());
- // MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
- // if (callbackInfo != null) {
- // callbacks.add(callbackInfo);
- // }
- // }
- // return callbacks;
+ Set keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*");
+ if (keys == null || keys.isEmpty()) {
+ return new ArrayList<>();
+ }
- log.warn("Redis MQTT 回调存储未实现,返回空列表");
- return new ArrayList<>();
+ // 2. 批量获取回调信息
+ List callbacks = new ArrayList<>();
+ for (String key : keys) {
+ String callbackId = key.substring(CALLBACK_KEY_PREFIX.length());
+ MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
+ if (callbackInfo != null) {
+ callbacks.add(callbackInfo);
+ }
+ }
+ return callbacks;
}
@Override
public void publishMessageToNode(String nodeId, String callbackId, String messageBody) {
- // TODO: 实现通过 Redis Pub/Sub 发布消息到指定节点
- // 1. 构造消息体
- // Map message = new HashMap<>();
- // message.put("callbackId", callbackId);
- // message.put("messageBody", messageBody);
- //
- // 2. 序列化消息
- // String json = objectMapper.writeValueAsString(message);
- //
- // 3. 发布到节点频道
- // String channel = NODE_CHANNEL_PREFIX + nodeId;
- // stringRedisTemplate.convertAndSend(channel, json);
- //
- // log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}",
- // nodeId, callbackId, channel);
+ try {
+ // 1. 构造消息体
+ Map message = new HashMap<>();
+ message.put("callbackId", callbackId);
+ message.put("messageBody", messageBody);
- log.warn("Redis MQTT 回调存储未实现,跳过发布消息: nodeId={}, callbackId={}", nodeId, callbackId);
+ // 2. 序列化消息
+ String json = objectMapper.writeValueAsString(message);
+
+ // 3. 发布到节点频道
+ String channel = NODE_CHANNEL_PREFIX + nodeId;
+ stringRedisTemplate.convertAndSend(channel, json);
+
+ log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}",
+ nodeId, callbackId, channel);
+ } catch (JsonProcessingException e) {
+ log.error("序列化节点消息失败: nodeId={}, callbackId={}", nodeId, callbackId, e);
+ }
}
@Override
public void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener) {
- // TODO: 实现订阅当前节点的 Redis Pub/Sub 消息
// 1. 创建消息监听器
- // MessageListener redisMessageListener = (message, pattern) -> {
- // try {
- // String json = new String(message.getBody());
- // Map data = objectMapper.readValue(json,
- // new TypeReference