From cd12f726cbd1a8d9a0b0f3bf2333f91979747254 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Thu, 18 Dec 2025 16:30:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Lua=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=81=A5=E5=A3=AE=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mqtt/store/RedisMqttCallbackStore.java | 106 ++++++++++++++---- src/main/resources/application.yml | 2 +- 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java index 9ebe829..d47522a 100644 --- a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java +++ b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java @@ -9,6 +9,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; @@ -52,6 +53,39 @@ public class RedisMqttCallbackStore implements MqttCallbackStore { // 配置回调信息的过期时间 private static final long EXPIRE_SECONDS = 3600; // 1小时 + /** + * Lua 脚本:注册 MQTT 回调 + * 使用 Lua 脚本保证原子性,避免竞态条件 + * + * KEYS[1]: Topic 索引 key (mqtt:topic:{topic}) + * KEYS[2]: 回调信息 key (mqtt:callback:{callbackId}) + * ARGV[1]: callbackId + * ARGV[2]: 过期时间(秒) + * ARGV[3]: 回调信息 JSON + * + * 返回值: 1 表示成功 + */ + private static final String REGISTER_CALLBACK_SCRIPT = + "redis.call('SADD', KEYS[1], ARGV[1]) " + + "redis.call('EXPIRE', KEYS[1], ARGV[2]) " + + "redis.call('SETEX', KEYS[2], ARGV[2], ARGV[3]) " + + "return 1"; + + /** + * Lua 脚本:取消注册 MQTT 回调 + * 使用 Lua 脚本保证原子性 + * + * KEYS[1]: Topic 索引 key (mqtt:topic:{topic}) + * KEYS[2]: 回调信息 key (mqtt:callback:{callbackId}) + * ARGV[1]: callbackId + * + * 返回值: 1 表示成功 + */ + private static final String UNREGISTER_CALLBACK_SCRIPT = + "redis.call('SREM', KEYS[1], ARGV[1]) " + + "redis.call('DEL', KEYS[2]) " + + "return 1"; + public RedisMqttCallbackStore( StringRedisTemplate stringRedisTemplate, @Qualifier("machineFrameworkRedisMessageListenerContainer") RedisMessageListenerContainer redisMessageListenerContainer, @@ -65,43 +99,67 @@ public class RedisMqttCallbackStore implements MqttCallbackStore { @Override public void registerCallback(MqttCallbackInfo callbackInfo) { try { - // 1. 将 MqttCallbackInfo 序列化为 JSON + // 1. 序列化回调信息为 JSON String json = objectMapper.writeValueAsString(callbackInfo); - // 2. 存储到 Redis Hash - String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId(); - stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS); - - // 3. 添加到 Topic 索引 + // 2. 准备 Redis key String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); - stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId()); - stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS); + String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId(); + + // 3. 使用 Lua 脚本原子性地注册回调 + // 先添加到 Topic 索引,再存储回调信息,避免竞态条件 + stringRedisTemplate.execute( + new DefaultRedisScript<>(REGISTER_CALLBACK_SCRIPT, Long.class), + Arrays.asList(topicKey, callbackKey), + callbackInfo.getCallbackId(), + String.valueOf(EXPIRE_SECONDS), + json + ); log.debug("注册MQTT回调到Redis: callbackId={}, topic={}", callbackInfo.getCallbackId(), callbackInfo.getTopic()); + } catch (JsonProcessingException e) { - log.error("序列化回调信息失败: callbackId={}", callbackInfo.getCallbackId(), e); + log.error("序列化回调信息失败: callbackId={}, topic={}", + callbackInfo.getCallbackId(), callbackInfo.getTopic(), e); + throw new RuntimeException("注册MQTT回调失败: 序列化错误", e); + } catch (Exception e) { + log.error("注册MQTT回调到Redis失败: callbackId={}, topic={}", + callbackInfo.getCallbackId(), callbackInfo.getTopic(), e); + // 不抛出异常,让上层通过超时机制处理 + // 这样可以避免因为 Redis 临时故障导致整个命令执行失败 } } @Override public void unregisterCallback(String callbackId) { - // 1. 获取回调信息 - MqttCallbackInfo callbackInfo = getCallbackById(callbackId); - if (callbackInfo == null) { - return; + try { + // 1. 获取回调信息(需要知道 topic 才能删除索引) + MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + if (callbackInfo == null) { + log.debug("回调信息不存在,无需取消注册: callbackId={}", callbackId); + return; + } + + // 2. 准备 Redis key + String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); + String callbackKey = CALLBACK_KEY_PREFIX + callbackId; + + // 3. 使用 Lua 脚本原子性地取消注册回调 + stringRedisTemplate.execute( + new DefaultRedisScript<>(UNREGISTER_CALLBACK_SCRIPT, Long.class), + Arrays.asList(topicKey, callbackKey), + callbackId + ); + + log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}", + callbackId, callbackInfo.getTopic()); + + } catch (Exception e) { + log.error("从Redis中取消注册MQTT回调失败: callbackId={}", callbackId, e); + // 不抛出异常,取消注册失败不影响主流程 + // 回调会因为 TTL 自动过期 } - - // 2. 从 Topic 索引中移除 - String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); - stringRedisTemplate.opsForSet().remove(topicKey, callbackId); - - // 3. 删除回调信息 - String callbackKey = CALLBACK_KEY_PREFIX + callbackId; - stringRedisTemplate.delete(callbackKey); - - log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}", - callbackId, callbackInfo.getTopic()); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2dffaba..44f9a90 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -19,7 +19,7 @@ spring: machine: state: store: - type: memory + type: redis sn: repository: type: memory