208 lines
10 KiB
Plaintext
208 lines
10 KiB
Plaintext
|
||
单节点部署(默认)
|
||
|
||
# 使用内存存储(默认配置)
|
||
machine.state.store.type=memory
|
||
|
||
多节点部署
|
||
# 切换到 Redis 存储
|
||
machine.state.store.type=redis
|
||
# 配置节点ID(可选,不配置会自动生成)
|
||
machine.node.id=node-1
|
||
#本地启动redis
|
||
#docker run --name some-redis -d -p 6379:6379 redi
|
||
# Redis 配置
|
||
spring.redis.host=localhost
|
||
spring.redis.port=6379
|
||
spring.redis.password=your-password
|
||
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤1: 节点A 执行命令并注册回调 │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
节点A: executeCommand(TAKE_OFF)
|
||
↓
|
||
节点A: registerCallback(topic="dji/SN9527/response", ...)
|
||
↓
|
||
节点A: MqttCallbackStore.registerCallback()
|
||
↓
|
||
Redis: 存储回调信息(使用两个 Key 的原因:性能优化)
|
||
|
||
【Key 1】回调详细信息(Hash 结构)
|
||
- Key: mqtt:callback:{callbackId}
|
||
- Value: {callbackId, topic, nodeId="nodeA", timeoutMs, registerTime, ...}
|
||
- 作用: 存储单个回调的完整信息
|
||
- 查询: O(1) 时间复杂度,通过 callbackId 直接获取
|
||
|
||
【Key 2】Topic 索引(Set 结构)
|
||
- Key: mqtt:topic:dji/SN9527/response
|
||
- Value: Set<callbackId> // 例如: ["abc-123", "def-456", "ghi-789"]
|
||
- 作用: 快速查询等待某个 topic 的所有回调
|
||
- 查询: O(1) 时间复杂度,直接获取 callbackId 列表
|
||
|
||
【为什么需要两个 Key?】
|
||
如果只用一个 Key 存储所有回调,查询时需要遍历所有回调并过滤 topic,
|
||
时间复杂度为 O(n)。使用 Topic 索引后,可以直接获取目标回调列表,
|
||
时间复杂度降为 O(1),大幅提升性能。
|
||
|
||
【示例】
|
||
假设有 3 个回调:
|
||
- callbackId="abc-123", topic="dji/SN9527/response", nodeId="nodeA"
|
||
- callbackId="def-456", topic="dji/SN9527/state", nodeId="nodeB"
|
||
- callbackId="ghi-789", topic="dji/SN9527/response", nodeId="nodeA"
|
||
|
||
Redis 存储结构:
|
||
mqtt:callback:abc-123 → {callbackId:"abc-123", topic:"dji/SN9527/response", nodeId:"nodeA"}
|
||
mqtt:callback:def-456 → {callbackId:"def-456", topic:"dji/SN9527/state", nodeId:"nodeB"}
|
||
mqtt:callback:ghi-789 → {callbackId:"ghi-789", topic:"dji/SN9527/response", nodeId:"nodeA"}
|
||
mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
|
||
mqtt:topic:dji/SN9527/state → ["def-456"]
|
||
|
||
查询 topic="dji/SN9527/response" 的回调:
|
||
1. 从索引获取: SMEMBERS mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
|
||
2. 批量获取详情: MGET mqtt:callback:abc-123 mqtt:callback:ghi-789
|
||
3. 总耗时: O(1) + O(k),k 是该 topic 的回调数量(通常很小)
|
||
|
||
【Redis 数据清理时机】
|
||
Redis 中的回调数据有两种清理机制:
|
||
|
||
┌─────────────────────────────────────────────────────────────┐
|
||
│ 1️⃣ 主动清理(业务逻辑触发) │
|
||
└─────────────────────────────────────────────────────────────┘
|
||
触发时机:
|
||
✅ 回调成功执行后(TransactionExecutor 的 finally 块)
|
||
✅ 回调超时后(TransactionExecutor 的 finally 块)
|
||
✅ handleMessage 检测到超时(转发前检查)
|
||
|
||
清理操作:
|
||
unregisterCallback(callbackId)
|
||
↓
|
||
1. 获取回调信息: GET mqtt:callback:{callbackId}
|
||
2. 删除回调信息: DEL mqtt:callback:{callbackId}
|
||
3. 从索引中移除: SREM mqtt:topic:{topic} {callbackId}
|
||
|
||
示例:
|
||
T0: 注册回调,超时时间 10 秒
|
||
T5: 收到 MQTT 响应,回调执行成功
|
||
T5: 立即清理 Redis 数据 ✅
|
||
- DEL mqtt:callback:abc-123
|
||
- SREM mqtt:topic:dji/SN9527/response abc-123
|
||
|
||
┌─────────────────────────────────────────────────────────────┐
|
||
│ 2️⃣ 被动清理(Redis TTL 自动过期) │
|
||
└─────────────────────────────────────────────────────────────┘
|
||
作用:兜底机制,防止异常情况下的数据残留
|
||
|
||
设置方式:
|
||
// 注册回调时设置 TTL
|
||
SET mqtt:callback:{callbackId} {json} EX 3600 // 1小时后自动过期
|
||
EXPIRE mqtt:topic:{topic} 3600 // 1小时后自动过期
|
||
|
||
触发时机:
|
||
⚠️ 应用异常崩溃,主动清理未执行
|
||
⚠️ 网络分区,无法删除 Redis 数据
|
||
⚠️ 代码 Bug,主动清理失败
|
||
|
||
示例:
|
||
T0: 注册回调,TTL=3600秒(1小时)
|
||
T5: 应用崩溃,主动清理未执行 ❌
|
||
T3600: Redis 自动删除过期数据 ✅
|
||
- mqtt:callback:abc-123 自动过期删除
|
||
- mqtt:topic:dji/SN9527/response 自动过期删除
|
||
|
||
【推荐配置】
|
||
TTL 应该设置为回调超时时间的 2-3 倍,例如:
|
||
- 回调超时: 10 秒
|
||
- Redis TTL: 30 秒(10秒 × 3)
|
||
|
||
这样可以确保:
|
||
✅ 正常情况下,主动清理会在 10 秒内完成
|
||
✅ 异常情况下,Redis 会在 30 秒后自动清理
|
||
✅ 避免设置过长的 TTL 导致内存浪费
|
||
|
||
【注意事项】
|
||
⚠️ Topic 索引的 TTL 问题:
|
||
如果同一个 topic 有多个回调,每次添加新回调时都会刷新 TTL。
|
||
这可能导致索引的 TTL 比单个回调的 TTL 更长。
|
||
|
||
解决方案:
|
||
方案1: 不为 Topic 索引设置 TTL,只在删除最后一个 callbackId 时删除索引
|
||
方案2: 每次查询时过滤掉已过期的 callbackId(推荐)
|
||
↓
|
||
节点A: 本地内存存储 Consumer<Object>
|
||
- localHandlers.put(callbackId, consumer)
|
||
↓
|
||
节点A: 订阅 Redis Pub/Sub 频道
|
||
- Channel: mqtt:node:nodeA
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤2: MQTT Broker 将响应路由到节点B(不是节点A) │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
MQTT Broker: 收到设备响应
|
||
↓
|
||
MQTT Broker: 将消息路由到节点B(随机/轮询)
|
||
↓
|
||
节点B: MqttCallbackRegistry.handleMessage(topic, messageBody)
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤3: 节点B 从 Redis 查询等待该 topic 的回调 │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
节点B: callbackStore.getCallbacksByTopic("dji/SN9527/response")
|
||
↓
|
||
Redis: 查询 mqtt:topic:dji/SN9527/response
|
||
↓
|
||
Redis: 返回 Set<callbackId>
|
||
↓
|
||
Redis: 批量获取回调信息
|
||
- mqtt:callback:{callbackId1} → {nodeId="nodeA", ...}
|
||
- mqtt:callback:{callbackId2} → {nodeId="nodeA", ...}
|
||
↓
|
||
节点B: 获得回调列表 List<MqttCallbackInfo>
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤4: 节点B 判断回调属于哪个节点 │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
节点B: for (MqttCallbackInfo callback : callbacks) {
|
||
if (nodeId.equals(callback.getNodeId())) {
|
||
// 本节点的回调,直接执行
|
||
executeLocalCallback(...)
|
||
} else {
|
||
// 其他节点的回调,转发到目标节点
|
||
callbackStore.publishMessageToNode(...)
|
||
}
|
||
}
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤5: 节点B 通过 Redis Pub/Sub 转发消息到节点A │
|
||
└─────────────────────────────────┘
|
||
节点B: callbackStore.publishMessageToNode(
|
||
nodeId="nodeA",
|
||
callbackId="xxx",
|
||
messageBody="{...}" // JSON 字符串
|
||
)
|
||
↓
|
||
Redis Pub/Sub: PUBLISH mqtt:node:nodeA
|
||
{
|
||
"callbackId": "xxx",
|
||
"messageBody": "{...}"
|
||
}
|
||
|
||
┌─────────────────────────────────────────────────────────────────┐
|
||
│ 步骤6: 节点A 收到 Redis Pub/Sub 消息 │
|
||
└─────────────────────────────────────────────────────────────────┘
|
||
节点A: Redis Pub/Sub Listener 收到消息
|
||
↓
|
||
节点A: handleNodeMessage(callbackId, messageBodyJson)
|
||
↓
|
||
节点A: 反序列化消息体
|
||
- Object messageBody = objectMapper.readValue(messageBodyJson)
|
||
↓
|
||
节点A: executeLocalCallback(callbackId, messageBody)
|
||
↓
|
||
节点A: 从本地内存获取 Consumer
|
||
- Consumer<Object> handler = localHandlers.get(callbackId)
|
||
↓
|
||
节点A: 执行回调
|
||
- handler.accept(messageBody)
|
||
↓
|
||
✅ 命令执行成功! |