单节点部署(默认) # 使用内存存储(默认配置) machine.state.store.type=memory 多节点部署 # 切换到 Redis 存储 machine.state.store.type=redis # 配置节点ID(可选,不配置会自动生成) machine.node.id=node-1 # Redis 配置 spring.redis.host=localhost spring.redis.port=6379 spring.redis.password=your-password ┌─────────────────────────────────────────────────────────────────┐ │ 步骤1: 节点A 执行命令并注册回调 │ └─────────────────────────────────────────────────────────────────┘ 节点A: executeCommand(TAKE_OFF) ↓ 节点A: registerCallback(topic="dji/SN9527/response", ...) ↓ 节点A: MqttCallbackStore.registerCallback() ↓ Redis: 存储回调信息(使用两个 Key 的原因:性能优化) 【Key 1】回调详细信息(Hash 结构) - Key: mqtt:callback:{callbackId} - Value: {callbackId, topic, nodeId="nodeA", timeoutMs, registerTime, ...} - 作用: 存储单个回调的完整信息 - 查询: O(1) 时间复杂度,通过 callbackId 直接获取 【Key 2】Topic 索引(Set 结构) - Key: mqtt:topic:dji/SN9527/response - Value: Set // 例如: ["abc-123", "def-456", "ghi-789"] - 作用: 快速查询等待某个 topic 的所有回调 - 查询: O(1) 时间复杂度,直接获取 callbackId 列表 【为什么需要两个 Key?】 如果只用一个 Key 存储所有回调,查询时需要遍历所有回调并过滤 topic, 时间复杂度为 O(n)。使用 Topic 索引后,可以直接获取目标回调列表, 时间复杂度降为 O(1),大幅提升性能。 【示例】 假设有 3 个回调: - callbackId="abc-123", topic="dji/SN9527/response", nodeId="nodeA" - callbackId="def-456", topic="dji/SN9527/state", nodeId="nodeB" - callbackId="ghi-789", topic="dji/SN9527/response", nodeId="nodeA" Redis 存储结构: mqtt:callback:abc-123 → {callbackId:"abc-123", topic:"dji/SN9527/response", nodeId:"nodeA"} mqtt:callback:def-456 → {callbackId:"def-456", topic:"dji/SN9527/state", nodeId:"nodeB"} mqtt:callback:ghi-789 → {callbackId:"ghi-789", topic:"dji/SN9527/response", nodeId:"nodeA"} mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"] mqtt:topic:dji/SN9527/state → ["def-456"] 查询 topic="dji/SN9527/response" 的回调: 1. 从索引获取: SMEMBERS mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"] 2. 批量获取详情: MGET mqtt:callback:abc-123 mqtt:callback:ghi-789 3. 总耗时: O(1) + O(k),k 是该 topic 的回调数量(通常很小) 【Redis 数据清理时机】 Redis 中的回调数据有两种清理机制: ┌─────────────────────────────────────────────────────────────┐ │ 1️⃣ 主动清理(业务逻辑触发) │ └─────────────────────────────────────────────────────────────┘ 触发时机: ✅ 回调成功执行后(TransactionExecutor 的 finally 块) ✅ 回调超时后(TransactionExecutor 的 finally 块) ✅ handleMessage 检测到超时(转发前检查) 清理操作: unregisterCallback(callbackId) ↓ 1. 获取回调信息: GET mqtt:callback:{callbackId} 2. 删除回调信息: DEL mqtt:callback:{callbackId} 3. 从索引中移除: SREM mqtt:topic:{topic} {callbackId} 示例: T0: 注册回调,超时时间 10 秒 T5: 收到 MQTT 响应,回调执行成功 T5: 立即清理 Redis 数据 ✅ - DEL mqtt:callback:abc-123 - SREM mqtt:topic:dji/SN9527/response abc-123 ┌─────────────────────────────────────────────────────────────┐ │ 2️⃣ 被动清理(Redis TTL 自动过期) │ └─────────────────────────────────────────────────────────────┘ 作用:兜底机制,防止异常情况下的数据残留 设置方式: // 注册回调时设置 TTL SET mqtt:callback:{callbackId} {json} EX 3600 // 1小时后自动过期 EXPIRE mqtt:topic:{topic} 3600 // 1小时后自动过期 触发时机: ⚠️ 应用异常崩溃,主动清理未执行 ⚠️ 网络分区,无法删除 Redis 数据 ⚠️ 代码 Bug,主动清理失败 示例: T0: 注册回调,TTL=3600秒(1小时) T5: 应用崩溃,主动清理未执行 ❌ T3600: Redis 自动删除过期数据 ✅ - mqtt:callback:abc-123 自动过期删除 - mqtt:topic:dji/SN9527/response 自动过期删除 【推荐配置】 TTL 应该设置为回调超时时间的 2-3 倍,例如: - 回调超时: 10 秒 - Redis TTL: 30 秒(10秒 × 3) 这样可以确保: ✅ 正常情况下,主动清理会在 10 秒内完成 ✅ 异常情况下,Redis 会在 30 秒后自动清理 ✅ 避免设置过长的 TTL 导致内存浪费 【注意事项】 ⚠️ Topic 索引的 TTL 问题: 如果同一个 topic 有多个回调,每次添加新回调时都会刷新 TTL。 这可能导致索引的 TTL 比单个回调的 TTL 更长。 解决方案: 方案1: 不为 Topic 索引设置 TTL,只在删除最后一个 callbackId 时删除索引 方案2: 每次查询时过滤掉已过期的 callbackId(推荐) ↓ 节点A: 本地内存存储 Consumer - localHandlers.put(callbackId, consumer) ↓ 节点A: 订阅 Redis Pub/Sub 频道 - Channel: mqtt:node:nodeA ┌─────────────────────────────────────────────────────────────────┐ │ 步骤2: MQTT Broker 将响应路由到节点B(不是节点A) │ └─────────────────────────────────────────────────────────────────┘ MQTT Broker: 收到设备响应 ↓ MQTT Broker: 将消息路由到节点B(随机/轮询) ↓ 节点B: MqttCallbackRegistry.handleMessage(topic, messageBody) ┌─────────────────────────────────────────────────────────────────┐ │ 步骤3: 节点B 从 Redis 查询等待该 topic 的回调 │ └─────────────────────────────────────────────────────────────────┘ 节点B: callbackStore.getCallbacksByTopic("dji/SN9527/response") ↓ Redis: 查询 mqtt:topic:dji/SN9527/response ↓ Redis: 返回 Set ↓ Redis: 批量获取回调信息 - mqtt:callback:{callbackId1} → {nodeId="nodeA", ...} - mqtt:callback:{callbackId2} → {nodeId="nodeA", ...} ↓ 节点B: 获得回调列表 List ┌─────────────────────────────────────────────────────────────────┐ │ 步骤4: 节点B 判断回调属于哪个节点 │ └─────────────────────────────────────────────────────────────────┘ 节点B: for (MqttCallbackInfo callback : callbacks) { if (nodeId.equals(callback.getNodeId())) { // 本节点的回调,直接执行 executeLocalCallback(...) } else { // 其他节点的回调,转发到目标节点 callbackStore.publishMessageToNode(...) } } ┌─────────────────────────────────────────────────────────────────┐ │ 步骤5: 节点B 通过 Redis Pub/Sub 转发消息到节点A │ └─────────────────────────────────┘ 节点B: callbackStore.publishMessageToNode( nodeId="nodeA", callbackId="xxx", messageBody="{...}" // JSON 字符串 ) ↓ Redis Pub/Sub: PUBLISH mqtt:node:nodeA { "callbackId": "xxx", "messageBody": "{...}" } ┌─────────────────────────────────────────────────────────────────┐ │ 步骤6: 节点A 收到 Redis Pub/Sub 消息 │ └─────────────────────────────────────────────────────────────────┘ 节点A: Redis Pub/Sub Listener 收到消息 ↓ 节点A: handleNodeMessage(callbackId, messageBodyJson) ↓ 节点A: 反序列化消息体 - Object messageBody = objectMapper.readValue(messageBodyJson) ↓ 节点A: executeLocalCallback(callbackId, messageBody) ↓ 节点A: 从本地内存获取 Consumer - Consumer handler = localHandlers.get(callbackId) ↓ 节点A: 执行回调 - handler.accept(messageBody) ↓ ✅ 命令执行成功!