添加Lua脚本,增加健壮性

This commit is contained in:
孙小云 2025-12-18 16:30:10 +08:00
parent eabf668229
commit cd12f726cb
2 changed files with 83 additions and 25 deletions

View File

@ -9,6 +9,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@ -52,6 +53,39 @@ public class RedisMqttCallbackStore implements MqttCallbackStore {
// 配置回调信息的过期时间
private static final long EXPIRE_SECONDS = 3600; // 1小时
/**
* Lua 脚本注册 MQTT 回调
* 使用 Lua 脚本保证原子性避免竞态条件
*
* KEYS[1]: Topic 索引 key (mqtt:topic:{topic})
* KEYS[2]: 回调信息 key (mqtt:callback:{callbackId})
* ARGV[1]: callbackId
* ARGV[2]: 过期时间
* ARGV[3]: 回调信息 JSON
*
* 返回值: 1 表示成功
*/
private static final String REGISTER_CALLBACK_SCRIPT =
"redis.call('SADD', KEYS[1], ARGV[1]) " +
"redis.call('EXPIRE', KEYS[1], ARGV[2]) " +
"redis.call('SETEX', KEYS[2], ARGV[2], ARGV[3]) " +
"return 1";
/**
* Lua 脚本取消注册 MQTT 回调
* 使用 Lua 脚本保证原子性
*
* KEYS[1]: Topic 索引 key (mqtt:topic:{topic})
* KEYS[2]: 回调信息 key (mqtt:callback:{callbackId})
* ARGV[1]: callbackId
*
* 返回值: 1 表示成功
*/
private static final String UNREGISTER_CALLBACK_SCRIPT =
"redis.call('SREM', KEYS[1], ARGV[1]) " +
"redis.call('DEL', KEYS[2]) " +
"return 1";
public RedisMqttCallbackStore(
StringRedisTemplate stringRedisTemplate,
@Qualifier("machineFrameworkRedisMessageListenerContainer") RedisMessageListenerContainer redisMessageListenerContainer,
@ -65,43 +99,67 @@ public class RedisMqttCallbackStore implements MqttCallbackStore {
@Override
public void registerCallback(MqttCallbackInfo callbackInfo) {
try {
// 1. MqttCallbackInfo 序列化为 JSON
// 1. 序列化回调信息 JSON
String json = objectMapper.writeValueAsString(callbackInfo);
// 2. 存储到 Redis Hash
String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS);
// 3. 添加到 Topic 索引
// 2. 准备 Redis key
String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId());
stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS);
String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
// 3. 使用 Lua 脚本原子性地注册回调
// 先添加到 Topic 索引再存储回调信息避免竞态条件
stringRedisTemplate.execute(
new DefaultRedisScript<>(REGISTER_CALLBACK_SCRIPT, Long.class),
Arrays.asList(topicKey, callbackKey),
callbackInfo.getCallbackId(),
String.valueOf(EXPIRE_SECONDS),
json
);
log.debug("注册MQTT回调到Redis: callbackId={}, topic={}",
callbackInfo.getCallbackId(), callbackInfo.getTopic());
} catch (JsonProcessingException e) {
log.error("序列化回调信息失败: callbackId={}", callbackInfo.getCallbackId(), e);
log.error("序列化回调信息失败: callbackId={}, topic={}",
callbackInfo.getCallbackId(), callbackInfo.getTopic(), e);
throw new RuntimeException("注册MQTT回调失败: 序列化错误", e);
} catch (Exception e) {
log.error("注册MQTT回调到Redis失败: callbackId={}, topic={}",
callbackInfo.getCallbackId(), callbackInfo.getTopic(), e);
// 不抛出异常让上层通过超时机制处理
// 这样可以避免因为 Redis 临时故障导致整个命令执行失败
}
}
@Override
public void unregisterCallback(String callbackId) {
// 1. 获取回调信息
MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
if (callbackInfo == null) {
return;
try {
// 1. 获取回调信息需要知道 topic 才能删除索引
MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
if (callbackInfo == null) {
log.debug("回调信息不存在,无需取消注册: callbackId={}", callbackId);
return;
}
// 2. 准备 Redis key
String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
// 3. 使用 Lua 脚本原子性地取消注册回调
stringRedisTemplate.execute(
new DefaultRedisScript<>(UNREGISTER_CALLBACK_SCRIPT, Long.class),
Arrays.asList(topicKey, callbackKey),
callbackId
);
log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
callbackId, callbackInfo.getTopic());
} catch (Exception e) {
log.error("从Redis中取消注册MQTT回调失败: callbackId={}", callbackId, e);
// 不抛出异常取消注册失败不影响主流程
// 回调会因为 TTL 自动过期
}
// 2. Topic 索引中移除
String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
stringRedisTemplate.opsForSet().remove(topicKey, callbackId);
// 3. 删除回调信息
String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
stringRedisTemplate.delete(callbackKey);
log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
callbackId, callbackInfo.getTopic());
}
@Override

View File

@ -19,7 +19,7 @@ spring:
machine:
state:
store:
type: memory
type: redis
sn:
repository:
type: memory