添加MySql配置项

This commit is contained in:
孙小云 2025-12-18 15:46:16 +08:00
parent b31211e8f2
commit a17809909d
5 changed files with 192 additions and 215 deletions

12
pom.xml
View File

@ -71,5 +71,17 @@
<optional>true</optional>
</dependency>
<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Jackson for JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,13 +1,19 @@
package com.tuoheng.machine.mqtt.store;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 基于 Redis MQTT 回调存储实现
@ -27,192 +33,175 @@ import java.util.List;
* 1. application.properties 中配置machine.state.store.type=redis
* 2. 配置 Redis 连接信息
* 3. 实现 Redis 相关的序列化和 Pub/Sub 逻辑
*
* 注意当前为空实现需要在生产环境部署时完善
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis")
public class RedisMqttCallbackStore implements MqttCallbackStore {
// TODO: 注入 RedisTemplate StringRedisTemplate
// private final RedisTemplate<String, String> redisTemplate;
// private final StringRedisTemplate stringRedisTemplate;
// TODO: 注入 ObjectMapper 用于序列化
// private final ObjectMapper objectMapper;
private final StringRedisTemplate stringRedisTemplate;
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final ObjectMapper objectMapper;
// Redis key 前缀
private static final String CALLBACK_KEY_PREFIX = "mqtt:callback:";
private static final String TOPIC_INDEX_PREFIX = "mqtt:topic:";
private static final String NODE_CHANNEL_PREFIX = "mqtt:node:";
// TODO: 配置回调信息的过期时间可选
// private static final long EXPIRE_SECONDS = 3600; // 1小时
// 配置回调信息的过期时间
private static final long EXPIRE_SECONDS = 3600; // 1小时
public RedisMqttCallbackStore() {
log.warn("使用 Redis MQTT 回调存储实现,但当前为空实现,请在生产环境部署前完善");
public RedisMqttCallbackStore(StringRedisTemplate stringRedisTemplate,
RedisMessageListenerContainer redisMessageListenerContainer,
ObjectMapper objectMapper) {
this.stringRedisTemplate = stringRedisTemplate;
this.redisMessageListenerContainer = redisMessageListenerContainer;
this.objectMapper = objectMapper;
log.info("使用 Redis MQTT 回调存储实现");
}
@Override
public void registerCallback(MqttCallbackInfo callbackInfo) {
// TODO: 实现注册回调到 Redis
try {
// 1. MqttCallbackInfo 序列化为 JSON
// String json = objectMapper.writeValueAsString(callbackInfo);
//
// 2. 存储到 Redis Hash
// String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
// stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS);
//
// 3. 添加到 Topic 索引
// String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
// stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId());
// stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS);
//
// log.debug("注册MQTT回调到Redis: callbackId={}, topic={}",
// callbackInfo.getCallbackId(), callbackInfo.getTopic());
String json = objectMapper.writeValueAsString(callbackInfo);
log.warn("Redis MQTT 回调存储未实现,跳过注册: callbackId={}", callbackInfo.getCallbackId());
// 2. 存储到 Redis Hash
String callbackKey = CALLBACK_KEY_PREFIX + callbackInfo.getCallbackId();
stringRedisTemplate.opsForValue().set(callbackKey, json, EXPIRE_SECONDS, TimeUnit.SECONDS);
// 3. 添加到 Topic 索引
String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
stringRedisTemplate.opsForSet().add(topicKey, callbackInfo.getCallbackId());
stringRedisTemplate.expire(topicKey, EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("注册MQTT回调到Redis: callbackId={}, topic={}",
callbackInfo.getCallbackId(), callbackInfo.getTopic());
} catch (JsonProcessingException e) {
log.error("序列化回调信息失败: callbackId={}", callbackInfo.getCallbackId(), e);
}
}
@Override
public void unregisterCallback(String callbackId) {
// TODO: 实现从 Redis 删除回调
// 1. 获取回调信息
// MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
// if (callbackInfo == null) {
// return;
// }
//
// 2. Topic 索引中移除
// String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
// stringRedisTemplate.opsForSet().remove(topicKey, callbackId);
//
// 3. 删除回调信息
// String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
// stringRedisTemplate.delete(callbackKey);
//
// log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
// callbackId, callbackInfo.getTopic());
MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
if (callbackInfo == null) {
return;
}
log.warn("Redis MQTT 回调存储未实现,跳过取消注册: callbackId={}", callbackId);
// 2. Topic 索引中移除
String topicKey = TOPIC_INDEX_PREFIX + callbackInfo.getTopic();
stringRedisTemplate.opsForSet().remove(topicKey, callbackId);
// 3. 删除回调信息
String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
stringRedisTemplate.delete(callbackKey);
log.debug("从Redis中取消注册MQTT回调: callbackId={}, topic={}",
callbackId, callbackInfo.getTopic());
}
@Override
public List<MqttCallbackInfo> getCallbacksByTopic(String topic) {
// TODO: 实现从 Redis 获取指定 topic 的所有回调
// 1. Topic 索引获取所有 callbackId
// String topicKey = TOPIC_INDEX_PREFIX + topic;
// Set<String> callbackIds = stringRedisTemplate.opsForSet().members(topicKey);
// if (callbackIds == null || callbackIds.isEmpty()) {
// return new ArrayList<>();
// }
//
// 2. 批量获取回调信息
// List<MqttCallbackInfo> callbacks = new ArrayList<>();
// for (String callbackId : callbackIds) {
// MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
// if (callbackInfo != null) {
// callbacks.add(callbackInfo);
// }
// }
// return callbacks;
log.warn("Redis MQTT 回调存储未实现,返回空列表: topic={}", topic);
String topicKey = TOPIC_INDEX_PREFIX + topic;
Set<String> callbackIds = stringRedisTemplate.opsForSet().members(topicKey);
if (callbackIds == null || callbackIds.isEmpty()) {
return new ArrayList<>();
}
// 2. 批量获取回调信息
List<MqttCallbackInfo> callbacks = new ArrayList<>();
for (String callbackId : callbackIds) {
MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
if (callbackInfo != null) {
callbacks.add(callbackInfo);
}
}
return callbacks;
}
@Override
public MqttCallbackInfo getCallbackById(String callbackId) {
// TODO: 实现从 Redis 获取回调信息
// String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
// String json = stringRedisTemplate.opsForValue().get(callbackKey);
// if (json == null) {
// return null;
// }
//
// try {
// return objectMapper.readValue(json, MqttCallbackInfo.class);
// } catch (JsonProcessingException e) {
// log.error("反序列化回调信息失败: callbackId={}", callbackId, e);
// return null;
// }
log.warn("Redis MQTT 回调存储未实现,返回 null: callbackId={}", callbackId);
String callbackKey = CALLBACK_KEY_PREFIX + callbackId;
String json = stringRedisTemplate.opsForValue().get(callbackKey);
if (json == null) {
return null;
}
try {
return objectMapper.readValue(json, MqttCallbackInfo.class);
} catch (JsonProcessingException e) {
log.error("反序列化回调信息失败: callbackId={}", callbackId, e);
return null;
}
}
@Override
public List<MqttCallbackInfo> getAllCallbacks() {
// TODO: 实现获取所有回调信息用于清理超时回调
// 1. 扫描所有 mqtt:callback:* key
// Set<String> keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*");
// if (keys == null || keys.isEmpty()) {
// return new ArrayList<>();
// }
//
// 2. 批量获取回调信息
// List<MqttCallbackInfo> callbacks = new ArrayList<>();
// for (String key : keys) {
// String callbackId = key.substring(CALLBACK_KEY_PREFIX.length());
// MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
// if (callbackInfo != null) {
// callbacks.add(callbackInfo);
// }
// }
// return callbacks;
log.warn("Redis MQTT 回调存储未实现,返回空列表");
Set<String> keys = stringRedisTemplate.keys(CALLBACK_KEY_PREFIX + "*");
if (keys == null || keys.isEmpty()) {
return new ArrayList<>();
}
// 2. 批量获取回调信息
List<MqttCallbackInfo> callbacks = new ArrayList<>();
for (String key : keys) {
String callbackId = key.substring(CALLBACK_KEY_PREFIX.length());
MqttCallbackInfo callbackInfo = getCallbackById(callbackId);
if (callbackInfo != null) {
callbacks.add(callbackInfo);
}
}
return callbacks;
}
@Override
public void publishMessageToNode(String nodeId, String callbackId, String messageBody) {
// TODO: 实现通过 Redis Pub/Sub 发布消息到指定节点
try {
// 1. 构造消息体
// Map<String, String> message = new HashMap<>();
// message.put("callbackId", callbackId);
// message.put("messageBody", messageBody);
//
// 2. 序列化消息
// String json = objectMapper.writeValueAsString(message);
//
// 3. 发布到节点频道
// String channel = NODE_CHANNEL_PREFIX + nodeId;
// stringRedisTemplate.convertAndSend(channel, json);
//
// log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}",
// nodeId, callbackId, channel);
Map<String, String> message = new HashMap<>();
message.put("callbackId", callbackId);
message.put("messageBody", messageBody);
log.warn("Redis MQTT 回调存储未实现,跳过发布消息: nodeId={}, callbackId={}", nodeId, callbackId);
// 2. 序列化消息
String json = objectMapper.writeValueAsString(message);
// 3. 发布到节点频道
String channel = NODE_CHANNEL_PREFIX + nodeId;
stringRedisTemplate.convertAndSend(channel, json);
log.debug("发布消息到节点: nodeId={}, callbackId={}, channel={}",
nodeId, callbackId, channel);
} catch (JsonProcessingException e) {
log.error("序列化节点消息失败: nodeId={}, callbackId={}", nodeId, callbackId, e);
}
}
@Override
public void subscribeNodeMessages(String nodeId, NodeMessageListener messageListener) {
// TODO: 实现订阅当前节点的 Redis Pub/Sub 消息
// 1. 创建消息监听器
// MessageListener redisMessageListener = (message, pattern) -> {
// try {
// String json = new String(message.getBody());
// Map<String, String> data = objectMapper.readValue(json,
// new TypeReference<Map<String, String>>() {});
//
// String callbackId = data.get("callbackId");
// String messageBody = data.get("messageBody");
//
// messageListener.onMessage(callbackId, messageBody);
// } catch (Exception e) {
// log.error("处理Redis Pub/Sub消息失败", e);
// }
// };
//
// 2. 订阅节点频道
// String channel = NODE_CHANNEL_PREFIX + nodeId;
// redisTemplate.getConnectionFactory().getConnection()
// .subscribe(redisMessageListener, channel.getBytes());
//
// log.info("订阅节点消息: nodeId={}, channel={}", nodeId, channel);
MessageListener redisMessageListener = (message, pattern) -> {
try {
String json = new String(message.getBody());
Map<String, String> data = objectMapper.readValue(json,
new TypeReference<Map<String, String>>() {});
log.warn("Redis MQTT 回调存储未实现,跳过订阅节点消息: nodeId={}", nodeId);
String callbackId = data.get("callbackId");
String messageBody = data.get("messageBody");
messageListener.onMessage(callbackId, messageBody);
} catch (Exception e) {
log.error("处理Redis Pub/Sub消息失败", e);
}
};
// 2. 订阅节点频道
String channel = NODE_CHANNEL_PREFIX + nodeId;
redisMessageListenerContainer.addMessageListener(redisMessageListener, new ChannelTopic(channel));
log.info("订阅节点消息: nodeId={}, channel={}", nodeId, channel);
}
}

View File

@ -12,10 +12,14 @@ import java.util.concurrent.ConcurrentHashMap;
* 适用于单节点部署或开发测试环境
*
* 注意这是默认实现当没有配置数据库时使用
*
* 使用方式
* 1. 不配置 machine.sn.repository.type默认使用内存实现
* 2. 或在 application.properties 中配置machine.sn.repository.type=memory
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "memory", matchIfMissing = true)
@ConditionalOnProperty(name = "machine.sn.repository.type", havingValue = "memory", matchIfMissing = true)
public class InMemorySnVendorMappingRepository implements SnVendorMappingRepository {
/**

View File

@ -18,7 +18,7 @@ import org.springframework.stereotype.Component;
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='SN到厂家类型映射表';
*
* 使用方式
* 1. application.properties 中配置machine.state.store.type=redis
* 1. application.properties 中配置machine.sn.repository.type=mysql
* 2. 配置 MySQL 数据源
* 3. 创建上述表结构
* 4. 实现下面的 CRUD 方法
@ -27,7 +27,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis")
@ConditionalOnProperty(name = "machine.sn.repository.type", havingValue = "mysql")
public class MysqlSnVendorMappingRepository implements SnVendorMappingRepository {
// TODO: 注入 JdbcTemplate MyBatis Mapper

View File

@ -3,8 +3,11 @@ package com.tuoheng.machine.vendor.store;
import com.tuoheng.machine.vendor.repository.SnVendorMappingRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 基于 Redis + Repository SN 到厂家类型映射存储实现
* 适用于多节点部署的生产环境
@ -24,16 +27,13 @@ import org.springframework.stereotype.Component;
* 2. 配置 Redis 连接信息
* 3. Repository 会根据配置自动选择实现内存/MySQL
* 4. 实现 Redis 的缓存逻辑
*
* 注意当前为空实现需要在生产环境部署时完善
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "machine.state.store.type", havingValue = "redis")
public class RedisSnVendorMappingStore implements SnVendorMappingStore {
// TODO: 注入 RedisTemplate
// private final StringRedisTemplate redisTemplate;
private final StringRedisTemplate redisTemplate;
/**
* 持久化存储层支持内存/MySQL等实现
@ -43,102 +43,74 @@ public class RedisSnVendorMappingStore implements SnVendorMappingStore {
// Redis key 前缀
private static final String KEY_PREFIX = "machine:sn:vendor:";
// TODO: 配置缓存过期时间
// private static final long CACHE_EXPIRE_SECONDS = 86400; // 24小时
// 配置缓存过期时间
private static final long CACHE_EXPIRE_SECONDS = 86400; // 24小时
public RedisSnVendorMappingStore(SnVendorMappingRepository repository) {
public RedisSnVendorMappingStore(StringRedisTemplate redisTemplate,
SnVendorMappingRepository repository) {
this.redisTemplate = redisTemplate;
this.repository = repository;
log.warn("使用 Redis+Repository SN 映射存储实现,但当前为空实现,请在生产环境部署前完善");
log.info("使用 Redis+Repository SN 映射存储实现");
log.info("持久化层实现: {}", repository.getClass().getSimpleName());
}
@Override
public String getVendorType(String sn) {
// TODO: 实现从 Redis + MySQL 获取映射
// 1. 先从 Redis 缓存获取
// String key = KEY_PREFIX + sn;
// String vendorType = redisTemplate.opsForValue().get(key);
// if (vendorType != null) {
// log.debug("从 Redis 缓存获取 SN 映射: sn={}, vendorType={}", sn, vendorType);
// return vendorType;
// }
//
// 2. Redis 没有 MySQL 数据库获取
// try {
// vendorType = jdbcTemplate.queryForObject(
// "SELECT vendor_type FROM sn_vendor_mapping WHERE sn = ?",
// String.class,
// sn
// );
//
// if (vendorType != null) {
// // 3. 获取到后存入 Redis 缓存
// redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
// log.debug("从 MySQL 获取 SN 映射并缓存到 Redis: sn={}, vendorType={}", sn, vendorType);
// return vendorType;
// }
// } catch (EmptyResultDataAccessException e) {
// log.debug("MySQL 中不存在 SN 映射: sn={}", sn);
// }
//
// return null;
String key = KEY_PREFIX + sn;
String vendorType = redisTemplate.opsForValue().get(key);
if (vendorType != null) {
log.debug("从 Redis 缓存获取 SN 映射: sn={}, vendorType={}", sn, vendorType);
return vendorType;
}
log.warn("Redis+MySQL SN 映射存储未实现,返回 null: sn={}", sn);
// 2. Redis 没有 Repository 持久化层获取
vendorType = repository.findVendorTypeBySn(sn);
if (vendorType != null) {
// 3. 获取到后存入 Redis 缓存
redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("从 Repository 获取 SN 映射并缓存到 Redis: sn={}, vendorType={}", sn, vendorType);
return vendorType;
}
log.debug("Repository 中不存在 SN 映射: sn={}", sn);
return null;
}
@Override
public void saveMapping(String sn, String vendorType) {
// TODO: 实现保存映射到 Redis + MySQL
// 1. 保存到 MySQL 数据库持久化
// jdbcTemplate.update(
// "INSERT INTO sn_vendor_mapping (sn, vendor_type) VALUES (?, ?) " +
// "ON DUPLICATE KEY UPDATE vendor_type = ?, updated_at = CURRENT_TIMESTAMP",
// sn, vendorType, vendorType
// );
//
// 2. 保存到 Redis 缓存
// String key = KEY_PREFIX + sn;
// redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
//
// log.debug("保存 SN 映射到 Redis+MySQL: sn={}, vendorType={}", sn, vendorType);
// 1. 保存到 Repository 持久化层
repository.save(sn, vendorType);
log.warn("Redis+MySQL SN 映射存储未实现,跳过保存: sn={}, vendorType={}", sn, vendorType);
// 2. 保存到 Redis 缓存
String key = KEY_PREFIX + sn;
redisTemplate.opsForValue().set(key, vendorType, CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS);
log.debug("保存 SN 映射到 Redis+Repository: sn={}, vendorType={}", sn, vendorType);
}
@Override
public void removeMapping(String sn) {
// TODO: 实现从 Redis + MySQL 删除映射
// 1. MySQL 删除
// jdbcTemplate.update("DELETE FROM sn_vendor_mapping WHERE sn = ?", sn);
//
// 2. Redis 删除
// String key = KEY_PREFIX + sn;
// redisTemplate.delete(key);
//
// log.debug("从 Redis+MySQL 中移除 SN 映射: sn={}", sn);
// 1. Repository 删除
repository.delete(sn);
log.warn("Redis+MySQL SN 映射存储未实现,跳过删除: sn={}", sn);
// 2. Redis 删除
String key = KEY_PREFIX + sn;
redisTemplate.delete(key);
log.debug("从 Redis+Repository 中移除 SN 映射: sn={}", sn);
}
@Override
public boolean exists(String sn) {
// TODO: 实现检查映射是否存在
// 1. 先检查 Redis
// String key = KEY_PREFIX + sn;
// if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
// return true;
// }
//
// 2. 再检查 MySQL
// Integer count = jdbcTemplate.queryForObject(
// "SELECT COUNT(*) FROM sn_vendor_mapping WHERE sn = ?",
// Integer.class,
// sn
// );
// return count != null && count > 0;
String key = KEY_PREFIX + sn;
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
return true;
}
log.warn("Redis+MySQL SN 映射存储未实现,返回 false: sn={}", sn);
return false;
// 2. 再检查 Repository
return repository.exists(sn);
}
}