thingsboard-client-demo/src/main/java/com/tuoheng/machine
孙小云 7c260acdf9 更新说明文件 2025-12-18 17:09:41 +08:00
..
command 实现完全异步化 2025-12-18 16:49:18 +08:00
config 实现完全异步化 2025-12-18 16:49:18 +08:00
instruction 修改类结构 2025-12-18 13:47:41 +08:00
mqtt 添加Lua脚本,增加健壮性 2025-12-18 16:30:10 +08:00
state 修改框架内容 2025-12-18 13:22:34 +08:00
statemachine 修改测试用例 2025-12-18 16:19:55 +08:00
vendor 添加MySql配置项 2025-12-18 15:46:16 +08:00
MachineCommandManager.java 修改测试用例 2025-12-18 16:19:55 +08:00
readme.txt 更新说明文件 2025-12-18 17:09:41 +08:00

readme.txt

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

  生产需要实现
  MysqlSnVendorMappingRepository 这个类
  MQTT回调的地方需要转发到MqttCallbackRegistry 这个类
  需要实现 MqttClient 这边消息发送的逻辑
  需要配置 sn.repository.type=mysql


  单节点部署(默认)

  # 使用内存存储(默认配置)
  machine.state.store.type=memory

  # 以下配置生产需要修改为mysql同时实现 MysqlSnVendorMappingRepository 这个类
  sn.repository.type=memory


  多节点部署
  # 切换到 Redis 存储
  machine.state.store.type=redis
  # 配置节点ID可选不配置会自动生成
  machine.node.id=node-1
  #本地启动redis
  #docker run --name some-redis -d -p 6379:6379 redi
  # Redis 配置
  spring.redis.host=localhost
  spring.redis.port=6379
  spring.redis.password=your-password


┌─────────────────────────────────────────────────────────────────┐
  │ 步骤1: 节点A 执行命令并注册回调                                    │
  └─────────────────────────────────────────────────────────────────┘
  节点A: executeCommand(TAKE_OFF)
    ↓
  节点A: registerCallback(topic="dji/SN9527/response", ...)
    ↓
  节点A: MqttCallbackStore.registerCallback()
    ↓
  Redis: 存储回调信息(使用两个 Key 的原因:性能优化)

    【Key 1】回调详细信息Hash 结构)
    - Key: mqtt:callback:{callbackId}
    - Value: {callbackId, topic, nodeId="nodeA", timeoutMs, registerTime, ...}
    - 作用: 存储单个回调的完整信息
    - 查询: O(1) 时间复杂度,通过 callbackId 直接获取

    【Key 2】Topic 索引Set 结构)
    - Key: mqtt:topic:dji/SN9527/response
    - Value: Set<callbackId>  // 例如: ["abc-123", "def-456", "ghi-789"]
    - 作用: 快速查询等待某个 topic 的所有回调
    - 查询: O(1) 时间复杂度,直接获取 callbackId 列表

    【为什么需要两个 Key】
    如果只用一个 Key 存储所有回调,查询时需要遍历所有回调并过滤 topic
    时间复杂度为 O(n)。使用 Topic 索引后,可以直接获取目标回调列表,
    时间复杂度降为 O(1),大幅提升性能。

    【示例】
    假设有 3 个回调:
    - callbackId="abc-123", topic="dji/SN9527/response", nodeId="nodeA"
    - callbackId="def-456", topic="dji/SN9527/state", nodeId="nodeB"
    - callbackId="ghi-789", topic="dji/SN9527/response", nodeId="nodeA"

    Redis 存储结构:
    mqtt:callback:abc-123 → {callbackId:"abc-123", topic:"dji/SN9527/response", nodeId:"nodeA"}
    mqtt:callback:def-456 → {callbackId:"def-456", topic:"dji/SN9527/state", nodeId:"nodeB"}
    mqtt:callback:ghi-789 → {callbackId:"ghi-789", topic:"dji/SN9527/response", nodeId:"nodeA"}
    mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
    mqtt:topic:dji/SN9527/state → ["def-456"]

    查询 topic="dji/SN9527/response" 的回调:
    1. 从索引获取: SMEMBERS mqtt:topic:dji/SN9527/response → ["abc-123", "ghi-789"]
    2. 批量获取详情: MGET mqtt:callback:abc-123 mqtt:callback:ghi-789
    3. 总耗时: O(1) + O(k)k 是该 topic 的回调数量(通常很小)

    【Redis 数据清理时机】
    Redis 中的回调数据有两种清理机制:

    ┌─────────────────────────────────────────────────────────────┐
    │ 1⃣ 主动清理(业务逻辑触发)                                  │
    └─────────────────────────────────────────────────────────────┘
    触发时机:
    ✅ 回调成功执行后TransactionExecutor 的 finally 块)
    ✅ 回调超时后TransactionExecutor 的 finally 块)
    ✅ handleMessage 检测到超时(转发前检查)

    清理操作:
    unregisterCallback(callbackId)
      ↓
    1. 获取回调信息: GET mqtt:callback:{callbackId}
    2. 删除回调信息: DEL mqtt:callback:{callbackId}
    3. 从索引中移除: SREM mqtt:topic:{topic} {callbackId}

    示例:
    T0:  注册回调,超时时间 10 秒
    T5:  收到 MQTT 响应,回调执行成功
    T5:  立即清理 Redis 数据 ✅
         - DEL mqtt:callback:abc-123
         - SREM mqtt:topic:dji/SN9527/response abc-123

    ┌─────────────────────────────────────────────────────────────┐
    │ 2⃣ 被动清理Redis TTL 自动过期)                            │
    └─────────────────────────────────────────────────────────────┘
    作用:兜底机制,防止异常情况下的数据残留

    设置方式:
    // 注册回调时设置 TTL
    SET mqtt:callback:{callbackId} {json} EX 3600  // 1小时后自动过期
    EXPIRE mqtt:topic:{topic} 3600                 // 1小时后自动过期

    触发时机:
    ⚠️  应用异常崩溃,主动清理未执行
    ⚠️  网络分区,无法删除 Redis 数据
    ⚠️  代码 Bug主动清理失败

    示例:
    T0:     注册回调TTL=3600秒1小时
    T5:     应用崩溃,主动清理未执行 ❌
    T3600:  Redis 自动删除过期数据 ✅
            - mqtt:callback:abc-123 自动过期删除
            - mqtt:topic:dji/SN9527/response 自动过期删除

    【推荐配置】
    TTL 应该设置为回调超时时间的 2-3 倍,例如:
    - 回调超时: 10 秒
    - Redis TTL: 30 秒10秒 × 3

    这样可以确保:
    ✅ 正常情况下,主动清理会在 10 秒内完成
    ✅ 异常情况下Redis 会在 30 秒后自动清理
    ✅ 避免设置过长的 TTL 导致内存浪费

    【注意事项】
    ⚠️  Topic 索引的 TTL 问题:
    如果同一个 topic 有多个回调,每次添加新回调时都会刷新 TTL。
    这可能导致索引的 TTL 比单个回调的 TTL 更长。

    解决方案:
    方案1: 不为 Topic 索引设置 TTL只在删除最后一个 callbackId 时删除索引
    方案2: 每次查询时过滤掉已过期的 callbackId推荐
    ↓
  节点A: 本地内存存储 Consumer<Object>
    - localHandlers.put(callbackId, consumer)
    ↓
  节点A: 订阅 Redis Pub/Sub 频道
    - Channel: mqtt:node:nodeA

  ┌─────────────────────────────────────────────────────────────────┐
  │ 步骤2: MQTT Broker 将响应路由到节点B不是节点A                  │
  └─────────────────────────────────────────────────────────────────┘
  MQTT Broker: 收到设备响应
    ↓
  MQTT Broker: 将消息路由到节点B随机/轮询)
    ↓
  节点B: MqttCallbackRegistry.handleMessage(topic, messageBody)

  ┌─────────────────────────────────────────────────────────────────┐
  │ 步骤3: 节点B 从 Redis 查询等待该 topic 的回调                      │
  └─────────────────────────────────────────────────────────────────┘
  节点B: callbackStore.getCallbacksByTopic("dji/SN9527/response")
    ↓
  Redis: 查询 mqtt:topic:dji/SN9527/response
    ↓
  Redis: 返回 Set<callbackId>
    ↓
  Redis: 批量获取回调信息
    - mqtt:callback:{callbackId1} → {nodeId="nodeA", ...}
    - mqtt:callback:{callbackId2} → {nodeId="nodeA", ...}
    ↓
  节点B: 获得回调列表 List<MqttCallbackInfo>

  ┌─────────────────────────────────────────────────────────────────┐
  │ 步骤4: 节点B 判断回调属于哪个节点                                  │
  └─────────────────────────────────────────────────────────────────┘
  节点B: for (MqttCallbackInfo callback : callbacks) {
    if (nodeId.equals(callback.getNodeId())) {
      // 本节点的回调,直接执行
      executeLocalCallback(...)
    } else {
      // 其他节点的回调,转发到目标节点
      callbackStore.publishMessageToNode(...)
    }
  }

  ┌─────────────────────────────────────────────────────────────────┐
  │ 步骤5: 节点B 通过 Redis Pub/Sub 转发消息到节点A                    │
  └─────────────────────────────────┘
  节点B: callbackStore.publishMessageToNode(
    nodeId="nodeA",
    callbackId="xxx",
    messageBody="{...}"  // JSON 字符串
  )
    ↓
  Redis Pub/Sub: PUBLISH mqtt:node:nodeA
    {
      "callbackId": "xxx",
      "messageBody": "{...}"
    }

  ┌─────────────────────────────────────────────────────────────────┐
  │ 步骤6: 节点A 收到 Redis Pub/Sub 消息                │
  └─────────────────────────────────────────────────────────────────┘
  节点A: Redis Pub/Sub Listener 收到消息
    ↓
  节点A: handleNodeMessage(callbackId, messageBodyJson)
    ↓
  节点A: 反序列化消息体
    - Object messageBody = objectMapper.readValue(messageBodyJson)
    ↓
  节点A: executeLocalCallback(callbackId, messageBody)
    ↓
  节点A: 从本地内存获取 Consumer
    - Consumer<Object> handler = localHandlers.get(callbackId)
    ↓
  节点A: 执行回调
    - handler.accept(messageBody)
    ↓
  ✅ 命令执行成功!