From a17809909db91b39338407ad91086264c05b2122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Thu, 18 Dec 2025 15:46:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0MySql=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 12 + .../mqtt/store/RedisMqttCallbackStore.java | 263 +++++++++--------- .../InMemorySnVendorMappingRepository.java | 6 +- .../MysqlSnVendorMappingRepository.java | 4 +- .../store/RedisSnVendorMappingStore.java | 122 ++++---- 5 files changed, 192 insertions(+), 215 deletions(-) diff --git a/pom.xml b/pom.xml index 43a19fa..d58dea7 100644 --- a/pom.xml +++ b/pom.xml @@ -71,5 +71,17 @@ true + + + org.springframework.boot + spring-boot-starter-data-redis + + + + + com.fasterxml.jackson.core + jackson-databind + + diff --git a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java index 39f5f7e..099c68c 100644 --- a/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java +++ b/src/main/java/com/tuoheng/machine/mqtt/store/RedisMqttCallbackStore.java @@ -1,13 +1,19 @@ package com.tuoheng.machine.mqtt.store; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.TimeUnit; /** * 基于 Redis 的 MQTT 回调存储实现 @@ -27,192 +33,175 @@ import java.util.List; * 1. 在 application.properties 中配置:machine.state.store.type=redis * 2. 配置 Redis 连接信息 * 3. 实现 Redis 相关的序列化和 Pub/Sub 逻辑 - * - * 注意:当前为空实现,需要在生产环境部署时完善 */ @Slf4j @Component @ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis") public class RedisMqttCallbackStore implements MqttCallbackStore { - // TODO: 注入 RedisTemplate 和 StringRedisTemplate - // private final RedisTemplate redisTemplate; - // private final StringRedisTemplate stringRedisTemplate; - - // TODO: 注入 ObjectMapper 用于序列化 - // private final ObjectMapper objectMapper; + private final StringRedisTemplate stringRedisTemplate; + private final RedisMessageListenerContainer redisMessageListenerContainer; + private final ObjectMapper objectMapper; // Redis key 前缀 private static final String CALLBACK_KEY_PREFIX = "mqtt:callback:"; private static final String TOPIC_INDEX_PREFIX = "mqtt:topic:"; private static final String NODE_CHANNEL_PREFIX = "mqtt:node:"; - // TODO: 配置回调信息的过期时间(可选) - // private static final long EXPIRE_SECONDS = 3600; // 1小时 + // 配置回调信息的过期时间 + private static final long EXPIRE_SECONDS = 3600; // 1小时 - public RedisMqttCallbackStore() { - log.warn("使用 Redis MQTT 回调存储实现,但当前为空实现,请在生产环境部署前完善"); + public RedisMqttCallbackStore(StringRedisTemplate stringRedisTemplate, + RedisMessageListenerContainer redisMessageListenerContainer, + ObjectMapper objectMapper) { + this.stringRedisTemplate = stringRedisTemplate; + this.redisMessageListenerContainer = redisMessageListenerContainer; + this.objectMapper = objectMapper; + log.info("使用 Redis MQTT 回调存储实现"); } @Override public void registerCallback(MqttCallbackInfo callbackInfo) { - // TODO: 实现注册回调到 Redis - // 1. 将 MqttCallbackInfo 序列化为 JSON - // String json = objectMapper.writeValueAsString(callbackInfo); - // - // 2. 存储到 Redis Hash - // String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId(); - // stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS); - // - // 3. 添加到 Topic 索引 - // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); - // stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId()); - // stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS); - // - // log.debug("注册MQTT回调到Redis: callbackId={}, topic={}", - // callbackInfo.getCallbackId(), callbackInfo.getTopic()); + try { + // 1. 将 MqttCallbackInfo 序列化为 JSON + String json = objectMapper.writeValueAsString(callbackInfo); - log.warn("Redis MQTT 回调存储未实现,跳过注册: callbackId={}", callbackInfo.getCallbackId()); + // 2. 存储到 Redis Hash + String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId(); + stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS); + + // 3. 添加到 Topic 索引 + String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); + stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId()); + stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS); + + log.debug("注册MQTT回调到Redis: callbackId={}, topic={}", + callbackInfo.getCallbackId(), callbackInfo.getTopic()); + } catch (JsonProcessingException e) { + log.error("序列化回调信息失败: callbackId={}", callbackInfo.getCallbackId(), e); + } } @Override public void unregisterCallback(String callbackId) { - // TODO: 实现从 Redis 删除回调 // 1. 获取回调信息 - // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); - // if (callbackInfo == null) { - // return; - // } - // - // 2. 从 Topic 索引中移除 - // String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); - // stringRedisTemplate.opsForSet().remove(topicKey, callbackId); - // - // 3. 删除回调信息 - // String callbackKey = CALLBACK_KEY_PREFIX + callbackId; - // stringRedisTemplate.delete(callbackKey); - // - // log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}", - // callbackId, callbackInfo.getTopic()); + MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + if (callbackInfo == null) { + return; + } - log.warn("Redis MQTT 回调存储未实现,跳过取消注册: callbackId={}", callbackId); + // 2. 从 Topic 索引中移除 + String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic(); + stringRedisTemplate.opsForSet().remove(topicKey, callbackId); + + // 3. 删除回调信息 + String callbackKey = CALLBACK_KEY_PREFIX + callbackId; + stringRedisTemplate.delete(callbackKey); + + log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}", + callbackId, callbackInfo.getTopic()); } @Override public List getCallbacksByTopic(String topic) { - // TODO: 实现从 Redis 获取指定 topic 的所有回调 // 1. 从 Topic 索引获取所有 callbackId - // String topicKey = TOPIC_INDEX_PREFIX + topic; - // Set callbackIds = stringRedisTemplate.opsForSet().members(topicKey); - // if (callbackIds == null || callbackIds.isEmpty()) { - // return new ArrayList<>(); - // } - // - // 2. 批量获取回调信息 - // List callbacks = new ArrayList<>(); - // for (String callbackId : callbackIds) { - // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); - // if (callbackInfo != null) { - // callbacks.add(callbackInfo); - // } - // } - // return callbacks; + String topicKey = TOPIC_INDEX_PREFIX + topic; + Set callbackIds = stringRedisTemplate.opsForSet().members(topicKey); + if (callbackIds == null || callbackIds.isEmpty()) { + return new ArrayList<>(); + } - log.warn("Redis MQTT 回调存储未实现,返回空列表: topic={}", topic); - return new ArrayList<>(); + // 2. 批量获取回调信息 + List callbacks = new ArrayList<>(); + for (String callbackId : callbackIds) { + MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + if (callbackInfo != null) { + callbacks.add(callbackInfo); + } + } + return callbacks; } @Override public MqttCallbackInfo getCallbackById(String callbackId) { - // TODO: 实现从 Redis 获取回调信息 - // String callbackKey = CALLBACK_KEY_PREFIX + callbackId; - // String json = stringRedisTemplate.opsForValue().get(callbackKey); - // if (json == null) { - // return null; - // } - // - // try { - // return objectMapper.readValue(json, MqttCallbackInfo.class); - // } catch (JsonProcessingException e) { - // log.error("反序列化回调信息失败: callbackId={}", callbackId, e); - // return null; - // } + String callbackKey = CALLBACK_KEY_PREFIX + callbackId; + String json = stringRedisTemplate.opsForValue().get(callbackKey); + if (json == null) { + return null; + } - log.warn("Redis MQTT 回调存储未实现,返回 null: callbackId={}", callbackId); - return null; + try { + return objectMapper.readValue(json, MqttCallbackInfo.class); + } catch (JsonProcessingException e) { + log.error("反序列化回调信息失败: callbackId={}", callbackId, e); + return null; + } } @Override public List getAllCallbacks() { - // TODO: 实现获取所有回调信息(用于清理超时回调) // 1. 扫描所有 mqtt:callback:* 的 key - // Set keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*"); - // if (keys == null || keys.isEmpty()) { - // return new ArrayList<>(); - // } - // - // 2. 批量获取回调信息 - // List callbacks = new ArrayList<>(); - // for (String key : keys) { - // String callbackId = key.substring(CALLBACK_KEY_PREFIX.length()); - // MqttCallbackInfo callbackInfo = getCallbackById(callbackId); - // if (callbackInfo != null) { - // callbacks.add(callbackInfo); - // } - // } - // return callbacks; + Set keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*"); + if (keys == null || keys.isEmpty()) { + return new ArrayList<>(); + } - log.warn("Redis MQTT 回调存储未实现,返回空列表"); - return new ArrayList<>(); + // 2. 批量获取回调信息 + List callbacks = new ArrayList<>(); + for (String key : keys) { + String callbackId = key.substring(CALLBACK_KEY_PREFIX.length()); + MqttCallbackInfo callbackInfo = getCallbackById(callbackId); + if (callbackInfo != null) { + callbacks.add(callbackInfo); + } + } + return callbacks; } @Override public void publishMessageToNode(String nodeId, String callbackId, String messageBody) { - // TODO: 实现通过 Redis Pub/Sub 发布消息到指定节点 - // 1. 构造消息体 - // Map message = new HashMap<>(); - // message.put("callbackId", callbackId); - // message.put("messageBody", messageBody); - // - // 2. 序列化消息 - // String json = objectMapper.writeValueAsString(message); - // - // 3. 发布到节点频道 - // String channel = NODE_CHANNEL_PREFIX + nodeId; - // stringRedisTemplate.convertAndSend(channel, json); - // - // log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}", - // nodeId, callbackId, channel); + try { + // 1. 构造消息体 + Map message = new HashMap<>(); + message.put("callbackId", callbackId); + message.put("messageBody", messageBody); - log.warn("Redis MQTT 回调存储未实现,跳过发布消息: nodeId={}, callbackId={}", nodeId, callbackId); + // 2. 序列化消息 + String json = objectMapper.writeValueAsString(message); + + // 3. 发布到节点频道 + String channel = NODE_CHANNEL_PREFIX + nodeId; + stringRedisTemplate.convertAndSend(channel, json); + + log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}", + nodeId, callbackId, channel); + } catch (JsonProcessingException e) { + log.error("序列化节点消息失败: nodeId={}, callbackId={}", nodeId, callbackId, e); + } } @Override public void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener) { - // TODO: 实现订阅当前节点的 Redis Pub/Sub 消息 // 1. 创建消息监听器 - // MessageListener redisMessageListener = (message, pattern) -> { - // try { - // String json = new String(message.getBody()); - // Map data = objectMapper.readValue(json, - // new TypeReference>() {}); - // - // String callbackId = data.get("callbackId"); - // String messageBody = data.get("messageBody"); - // - // messageListener.onMessage(callbackId, messageBody); - // } catch (Exception e) { - // log.error("处理Redis Pub/Sub消息失败", e); - // } - // }; - // - // 2. 订阅节点频道 - // String channel = NODE_CHANNEL_PREFIX + nodeId; - // redisTemplate.getConnectionFactory().getConnection() - // .subscribe(redisMessageListener, channel.getBytes()); - // - // log.info("订阅节点消息: nodeId={}, channel={}", nodeId, channel); + MessageListener redisMessageListener = (message, pattern) -> { + try { + String json = new String(message.getBody()); + Map data = objectMapper.readValue(json, + new TypeReference>() {}); - log.warn("Redis MQTT 回调存储未实现,跳过订阅节点消息: nodeId={}", nodeId); + String callbackId = data.get("callbackId"); + String messageBody = data.get("messageBody"); + + messageListener.onMessage(callbackId, messageBody); + } catch (Exception e) { + log.error("处理Redis Pub/Sub消息失败", e); + } + }; + + // 2. 订阅节点频道 + String channel = NODE_CHANNEL_PREFIX + nodeId; + redisMessageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(channel)); + + log.info("订阅节点消息: nodeId={}, channel={}", nodeId, channel); } } \ No newline at end of file diff --git a/src/main/java/com/tuoheng/machine/vendor/repository/InMemorySnVendorMappingRepository.java b/src/main/java/com/tuoheng/machine/vendor/repository/InMemorySnVendorMappingRepository.java index a0b6496..b8ad082 100644 --- a/src/main/java/com/tuoheng/machine/vendor/repository/InMemorySnVendorMappingRepository.java +++ b/src/main/java/com/tuoheng/machine/vendor/repository/InMemorySnVendorMappingRepository.java @@ -12,10 +12,14 @@ import java.util.concurrent.ConcurrentHashMap; * 适用于单节点部署或开发测试环境 * * 注意:这是默认实现,当没有配置数据库时使用 + * + * 使用方式: + * 1. 不配置 machine.sn.repository.type(默认使用内存实现) + * 2. 或在 application.properties 中配置:machine.sn.repository.type=memory */ @Slf4j @Component -@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "memory", matchIfMissing = true) +@ConditionalOnProperty(name = "machine.sn.repository.type", havingValue = "memory", matchIfMissing = true) public class InMemorySnVendorMappingRepository implements SnVendorMappingRepository { /** diff --git a/src/main/java/com/tuoheng/machine/vendor/repository/MysqlSnVendorMappingRepository.java b/src/main/java/com/tuoheng/machine/vendor/repository/MysqlSnVendorMappingRepository.java index 652305e..12e6fce 100644 --- a/src/main/java/com/tuoheng/machine/vendor/repository/MysqlSnVendorMappingRepository.java +++ b/src/main/java/com/tuoheng/machine/vendor/repository/MysqlSnVendorMappingRepository.java @@ -18,7 +18,7 @@ import org.springframework.stereotype.Component; * ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='SN到厂家类型映射表'; * * 使用方式: - * 1. 在 application.properties 中配置:machine.state.store.type=redis + * 1. 在 application.properties 中配置:machine.sn.repository.type=mysql * 2. 配置 MySQL 数据源 * 3. 创建上述表结构 * 4. 实现下面的 CRUD 方法 @@ -27,7 +27,7 @@ import org.springframework.stereotype.Component; */ @Slf4j @Component -@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis") +@ConditionalOnProperty(name = "machine.sn.repository.type", havingValue = "mysql") public class MysqlSnVendorMappingRepository implements SnVendorMappingRepository { // TODO: 注入 JdbcTemplate 或 MyBatis Mapper diff --git a/src/main/java/com/tuoheng/machine/vendor/store/RedisSnVendorMappingStore.java b/src/main/java/com/tuoheng/machine/vendor/store/RedisSnVendorMappingStore.java index ed18618..d12ebc5 100644 --- a/src/main/java/com/tuoheng/machine/vendor/store/RedisSnVendorMappingStore.java +++ b/src/main/java/com/tuoheng/machine/vendor/store/RedisSnVendorMappingStore.java @@ -3,8 +3,11 @@ package com.tuoheng.machine.vendor.store; import com.tuoheng.machine.vendor.repository.SnVendorMappingRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * 基于 Redis + Repository 的 SN 到厂家类型映射存储实现 * 适用于多节点部署的生产环境 @@ -24,16 +27,13 @@ import org.springframework.stereotype.Component; * 2. 配置 Redis 连接信息 * 3. Repository 会根据配置自动选择实现(内存/MySQL) * 4. 实现 Redis 的缓存逻辑 - * - * 注意:当前为空实现,需要在生产环境部署时完善 */ @Slf4j @Component @ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis") public class RedisSnVendorMappingStore implements SnVendorMappingStore { - // TODO: 注入 RedisTemplate - // private final StringRedisTemplate redisTemplate; + private final StringRedisTemplate redisTemplate; /** * 持久化存储层(支持内存/MySQL等实现) @@ -43,102 +43,74 @@ public class RedisSnVendorMappingStore implements SnVendorMappingStore { // Redis key 前缀 private static final String KEY_PREFIX = "machine:sn:vendor:"; - // TODO: 配置缓存过期时间 - // private static final long CACHE_EXPIRE_SECONDS = 86400; // 24小时 + // 配置缓存过期时间 + private static final long CACHE_EXPIRE_SECONDS = 86400; // 24小时 - public RedisSnVendorMappingStore(SnVendorMappingRepository repository) { + public RedisSnVendorMappingStore(StringRedisTemplate redisTemplate, + SnVendorMappingRepository repository) { + this.redisTemplate = redisTemplate; this.repository = repository; - log.warn("使用 Redis+Repository SN 映射存储实现,但当前为空实现,请在生产环境部署前完善"); + log.info("使用 Redis+Repository SN 映射存储实现"); log.info("持久化层实现: {}", repository.getClass().getSimpleName()); } @Override public String getVendorType(String sn) { - // TODO: 实现从 Redis + MySQL 获取映射 // 1. 先从 Redis 缓存获取 - // String key = KEY_PREFIX + sn; - // String vendorType = redisTemplate.opsForValue().get(key); - // if (vendorType != null) { - // log.debug("从 Redis 缓存获取 SN 映射: sn={}, vendorType={}", sn, vendorType); - // return vendorType; - // } - // - // 2. Redis 没有,从 MySQL 数据库获取 - // try { - // vendorType = jdbcTemplate.queryForObject( - // "SELECT vendor_type FROM sn_vendor_mapping WHERE sn = ?", - // String.class, - // sn - // ); - // - // if (vendorType != null) { - // // 3. 获取到后存入 Redis 缓存 - // redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS); - // log.debug("从 MySQL 获取 SN 映射并缓存到 Redis: sn={}, vendorType={}", sn, vendorType); - // return vendorType; - // } - // } catch (EmptyResultDataAccessException e) { - // log.debug("MySQL 中不存在 SN 映射: sn={}", sn); - // } - // - // return null; + String key = KEY_PREFIX + sn; + String vendorType = redisTemplate.opsForValue().get(key); + if (vendorType != null) { + log.debug("从 Redis 缓存获取 SN 映射: sn={}, vendorType={}", sn, vendorType); + return vendorType; + } - log.warn("Redis+MySQL SN 映射存储未实现,返回 null: sn={}", sn); + // 2. Redis 没有,从 Repository 持久化层获取 + vendorType = repository.findVendorTypeBySn(sn); + + if (vendorType != null) { + // 3. 获取到后存入 Redis 缓存 + redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS); + log.debug("从 Repository 获取 SN 映射并缓存到 Redis: sn={}, vendorType={}", sn, vendorType); + return vendorType; + } + + log.debug("Repository 中不存在 SN 映射: sn={}", sn); return null; } @Override public void saveMapping(String sn, String vendorType) { - // TODO: 实现保存映射到 Redis + MySQL - // 1. 保存到 MySQL 数据库(持久化) - // jdbcTemplate.update( - // "INSERT INTO sn_vendor_mapping (sn, vendor_type) VALUES (?, ?) " + - // "ON DUPLICATE KEY UPDATE vendor_type = ?, updated_at = CURRENT_TIMESTAMP", - // sn, vendorType, vendorType - // ); - // - // 2. 保存到 Redis 缓存 - // String key = KEY_PREFIX + sn; - // redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS); - // - // log.debug("保存 SN 映射到 Redis+MySQL: sn={}, vendorType={}", sn, vendorType); + // 1. 保存到 Repository 持久化层 + repository.save(sn, vendorType); - log.warn("Redis+MySQL SN 映射存储未实现,跳过保存: sn={}, vendorType={}", sn, vendorType); + // 2. 保存到 Redis 缓存 + String key = KEY_PREFIX + sn; + redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS); + + log.debug("保存 SN 映射到 Redis+Repository: sn={}, vendorType={}", sn, vendorType); } @Override public void removeMapping(String sn) { - // TODO: 实现从 Redis + MySQL 删除映射 - // 1. 从 MySQL 删除 - // jdbcTemplate.update("DELETE FROM sn_vendor_mapping WHERE sn = ?", sn); - // - // 2. 从 Redis 删除 - // String key = KEY_PREFIX + sn; - // redisTemplate.delete(key); - // - // log.debug("从 Redis+MySQL 中移除 SN 映射: sn={}", sn); + // 1. 从 Repository 删除 + repository.delete(sn); - log.warn("Redis+MySQL SN 映射存储未实现,跳过删除: sn={}", sn); + // 2. 从 Redis 删除 + String key = KEY_PREFIX + sn; + redisTemplate.delete(key); + + log.debug("从 Redis+Repository 中移除 SN 映射: sn={}", sn); } @Override public boolean exists(String sn) { - // TODO: 实现检查映射是否存在 // 1. 先检查 Redis - // String key = KEY_PREFIX + sn; - // if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) { - // return true; - // } - // - // 2. 再检查 MySQL - // Integer count = jdbcTemplate.queryForObject( - // "SELECT COUNT(*) FROM sn_vendor_mapping WHERE sn = ?", - // Integer.class, - // sn - // ); - // return count != null && count > 0; + String key = KEY_PREFIX + sn; + if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) { + return true; + } - log.warn("Redis+MySQL SN 映射存储未实现,返回 false: sn={}", sn); - return false; + // 2. 再检查 Repository + return repository.exists(sn); } } \ No newline at end of file