From 8f518bca79381b77568e0f7e7f102149856b280a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Wed, 17 Dec 2025 11:16:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=8C=87=E4=BB=A4=E7=BC=96?= =?UTF-8?q?=E6=8E=92=E7=9A=84=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../machine/CONDITIONAL_BRANCH_GUIDE.md | 364 +++++++++++++++ .../tuoheng/machine/command/Transaction.java | 98 +++- .../machine/command/TransactionExecutor.java | 176 +++++-- .../example/ConditionalBranchExample.java | 433 ++++++++++++++++++ .../machine/instruction/InstructionNode.java | 90 ++++ 5 files changed, 1128 insertions(+), 33 deletions(-) create mode 100644 src/main/java/com/tuoheng/machine/CONDITIONAL_BRANCH_GUIDE.md create mode 100644 src/main/java/com/tuoheng/machine/example/ConditionalBranchExample.java create mode 100644 src/main/java/com/tuoheng/machine/instruction/InstructionNode.java diff --git a/src/main/java/com/tuoheng/machine/CONDITIONAL_BRANCH_GUIDE.md b/src/main/java/com/tuoheng/machine/CONDITIONAL_BRANCH_GUIDE.md new file mode 100644 index 0000000..90aca50 --- /dev/null +++ b/src/main/java/com/tuoheng/machine/CONDITIONAL_BRANCH_GUIDE.md @@ -0,0 +1,364 @@ +# 条件分支执行指南 + +## 概述 + +框架现在支持基于状态回调结果的条件分支执行。这意味着你可以根据指令的执行结果(成功/失败)动态决定下一步执行哪个指令。 + +## 核心概念 + +### 1. InstructionNode(指令节点) + +指令节点是事务执行的基本单元,支持配置三种类型的后续节点: + +```java +InstructionNode node = new InstructionNode("nodeId", instruction) + .always("nextNodeId") // 无论成功失败都执行 + .onSuccess("successNodeId") // 成功时执行 + .onFailure("failureNodeId"); // 失败时执行 +``` + +**优先级**:`always` > `onSuccess/onFailure` + +### 2. 条件分支触发条件 + +只有当指令满足以下条件时,才会根据状态回调结果进行分支: + +1. **配置了 `stateCallback`**(状态回调配置) +2. **`canShortCircuit = false`**(不可短路,必须等待状态回调) + +```java +@Override +public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/state") + .fieldPath("droneState") + .expectedValue("FLYING") + .canShortCircuit(false) // 关键:必须为 false + .timeoutMs(60000) + .build(); +} +``` + +### 3. 分支决策逻辑 + +当指令配置了状态回调且 `canShortCircuit=false` 时: + +1. **状态回调成功** → 执行 `onSuccess` 配置的节点 +2. **状态回调失败/超时** → 执行 `onFailure` 配置的节点 +3. **如果找不到对应的下一步节点** → 将状态回调结果作为事务结果返回 + +## 使用场景 + +### 场景1:简单的成功/失败分支 + +```java +Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF); + +// 检查设备状态 +InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction()) + .onSuccess("takeoff") // 状态正常 -> 起飞 + .onFailure("repair"); // 状态异常 -> 修复 + +// 起飞 +InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction()); + +// 修复后再起飞 +InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction()) + .always("takeoff"); + +transaction.addNode(checkNode) + .addNode(takeoffNode) + .addNode(repairNode) + .setStartNode("check"); +``` + +**执行流程**: +``` +check (检查设备) + ├─ 成功 → takeoff (起飞) → 结束 + └─ 失败 → repair (修复) → takeoff (起飞) → 结束 +``` + +### 场景2:重试机制 + +```java +Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF); + +// 起飞(带状态回调) +InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction()) + .onSuccess("start_mission") // 起飞成功 -> 开始任务 + .onFailure("retry_takeoff"); // 起飞失败 -> 重试 + +// 开始任务 +InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction()); + +// 重试起飞 +InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction()) + .onSuccess("start_mission") // 重试成功 -> 开始任务 + .onFailure("emergency_land"); // 重试失败 -> 紧急降落 + +// 紧急降落 +InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction()); + +transaction.addNode(takeoffNode) + .addNode(startMissionNode) + .addNode(retryNode) + .addNode(emergencyLandNode) + .setStartNode("takeoff"); +``` + +**执行流程**: +``` +takeoff (起飞) + ├─ 成功 → start_mission (开始任务) → 结束 + └─ 失败 → retry_takeoff (重试) + ├─ 成功 → start_mission (开始任务) → 结束 + └─ 失败 → emergency_land (紧急降落) → 结束 +``` + +### 场景3:状态回调结果直接作为事务结果 + +```java +Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF); + +// 只有一个节点,没有配置下一步 +InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction()); +// 注意:没有配置 onSuccess 或 onFailure + +transaction.addNode(takeoffNode) + .setStartNode("takeoff"); +``` + +**执行流程**: +``` +takeoff (起飞,等待状态回调) + ├─ 状态回调成功 → 事务成功(没有下一步节点) + └─ 状态回调失败 → 事务失败(没有下一步节点) +``` + +### 场景4:复杂的多分支流程 + +```java +Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION); + +// 1. 打开舱门 +InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction()) + .always("check_weather"); + +// 2. 检查天气 +InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction()) + .onSuccess("takeoff") // 天气好 -> 起飞 + .onFailure("handle_bad_weather"); // 天气差 -> 处理 + +// 3. 起飞 +InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction()) + .onSuccess("execute_mission") // 起飞成功 -> 执行任务 + .onFailure("close_cover"); // 起飞失败 -> 关闭舱门 + +// 4. 执行任务 +InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction()); + +// 5. 处理恶劣天气 +InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction()) + .onSuccess("wait_and_retry") // 可以等待 -> 等待后重试 + .onFailure("close_cover"); // 天气极差 -> 关闭舱门 + +// 6. 等待并重试 +InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction()) + .always("check_weather"); // 等待后 -> 重新检查天气 + +// 7. 关闭舱门 +InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction()); + +transaction.addNode(openCoverNode) + .addNode(checkWeatherNode) + .addNode(takeoffNode) + .addNode(executeMissionNode) + .addNode(handleBadWeatherNode) + .addNode(waitAndRetryNode) + .addNode(closeCoverNode) + .setStartNode("open_cover") + .setTimeout(300000); +``` + +**执行流程**: +``` +open_cover (打开舱门) + → check_weather (检查天气) + ├─ 天气好 → takeoff (起飞) + │ ├─ 成功 → execute_mission (执行任务) → 结束 + │ └─ 失败 → close_cover (关闭舱门) → 结束 + └─ 天气差 → handle_bad_weather (处理恶劣天气) + ├─ 可等待 → wait_and_retry (等待) → check_weather (重新检查) + └─ 极差 → close_cover (关闭舱门) → 结束 +``` + +## 指令实现要点 + +### 1. 配置状态回调以支持条件分支 + +```java +public class MyInstruction extends AbstractInstruction { + + @Override + public String getName() { + return "MY_INSTRUCTION"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + // 发送MQTT指令 + String sn = context.getSn(); + mqttClient.publish("device/" + sn + "/command", "{\"cmd\":\"myCommand\"}"); + } + + @Override + public CallbackConfig getMethodCallbackConfig(InstructionContext context) { + // 方法回调:等待指令ACK + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/response") + .fieldPath("cmd") + .expectedValue("myCommand") + .canShortCircuit(false) + .timeoutMs(10000) + .build(); + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + // 状态回调:等待状态变化 + // 关键:canShortCircuit=false,表示必须根据状态回调结果决定下一步 + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/state") + .fieldPath("status") + .expectedValue("SUCCESS") + .canShortCircuit(false) // 必须为 false + .timeoutMs(60000) + .build(); + } +} +``` + +### 2. 使用自定义判断逻辑 + +```java +@Override +public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/state") + .customPredicate(messageBody -> { + // 自定义判断逻辑 + Map data = (Map) messageBody; + String status = (String) data.get("status"); + Integer errorCode = (Integer) data.get("errorCode"); + + // 只有状态为SUCCESS且错误码为0才算成功 + return "SUCCESS".equals(status) && errorCode == 0; + }) + .canShortCircuit(false) + .timeoutMs(60000) + .build(); +} +``` + +## 与旧版本的兼容性 + +框架完全兼容旧版本的线性指令链: + +```java +// 旧版本写法(仍然支持) +Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF) + .addInstruction(new Instruction1()) + .addInstruction(new Instruction2()) + .addInstruction(new Instruction3()); + +// 新版本写法(支持条件分支) +Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF) + .addNode(new InstructionNode("node1", new Instruction1()).always("node2")) + .addNode(new InstructionNode("node2", new Instruction2()).always("node3")) + .addNode(new InstructionNode("node3", new Instruction3())); +``` + +旧版本的 `addInstruction` 方法会自动转换为节点链,所有现有代码无需修改。 + +## 关键规则总结 + +1. **条件分支触发条件**: + - 必须配置 `stateCallback` + - `canShortCircuit` 必须为 `false` + +2. **分支优先级**: + - `always` > `onSuccess/onFailure` + - 如果配置了 `always`,则无论成功失败都执行该节点 + +3. **找不到下一步节点时**: + - 如果指令配置了状态回调且 `canShortCircuit=false` + - 但找不到对应的下一步节点(`onSuccess` 或 `onFailure`) + - 则状态回调的结果直接作为事务结果返回 + +4. **超时处理**: + - 状态回调超时视为失败 + - 会执行 `onFailure` 配置的节点 + - 如果没有配置 `onFailure`,则事务失败 + +5. **循环检测**: + - 框架不会自动检测循环 + - 请确保事务超时时间足够长 + - 避免无限循环(如 A → B → A) + +## 调试技巧 + +1. **查看执行日志**: +``` +执行指令节点: nodeId=check, instruction=CHECK_DEVICE +根据状态回调结果选择下一个节点: success=true, nextNodeId=takeoff +执行指令节点: nodeId=takeoff, instruction=TAKE_OFF +状态回调完成,无下一步指令,事务结束: success=true, nodeId=takeoff +``` + +2. **使用 `onComplete` 回调记录指令执行结果**: +```java +@Override +public void onComplete(InstructionContext context, InstructionResult result) { + log.info("指令执行完成: instruction={}, success={}, error={}", + getName(), result.isSuccess(), result.getErrorMessage()); +} +``` + +3. **注册命令执行监听器**: +```java +commandManager.registerCommandListener("debug-listener", (sn, result) -> { + log.info("命令执行完成: sn={}, commandType={}, success={}, failedInstruction={}", + sn, result.getCommandType(), result.isSuccess(), result.getFailedInstructionName()); +}); +``` + +## 最佳实践 + +1. **合理设置超时时间**: + - 指令超时 < 事务超时 + - 考虑重试次数和等待时间 + +2. **避免过深的分支嵌套**: + - 建议分支深度不超过3层 + - 复杂流程可以拆分为多个事务 + +3. **提供失败分支**: + - 关键指令应该配置 `onFailure` 分支 + - 避免因为一个指令失败导致整个事务失败 + +4. **使用有意义的节点ID**: + - 使用描述性的节点ID,便于调试 + - 例如:`check_weather`、`takeoff`、`retry_takeoff` + +5. **记录状态变化**: + - 在指令的 `onComplete` 中记录执行结果 + - 便于排查问题和优化流程 + +## 完整示例 + +参考 `ConditionalBranchExample.java` 文件,包含4个完整的示例: +- 示例1:简单的成功/失败分支 +- 示例2:基于状态回调结果的分支 +- 示例3:复杂的多分支流程 +- 示例4:状态回调结果直接作为事务结果 diff --git a/src/main/java/com/tuoheng/machine/command/Transaction.java b/src/main/java/com/tuoheng/machine/command/Transaction.java index c0cfb6f..89348ab 100644 --- a/src/main/java/com/tuoheng/machine/command/Transaction.java +++ b/src/main/java/com/tuoheng/machine/command/Transaction.java @@ -1,13 +1,16 @@ package com.tuoheng.machine.command; import com.tuoheng.machine.instruction.Instruction; +import com.tuoheng.machine.instruction.InstructionNode; import lombok.Data; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** - * 事务(由多个指令组成) + * 事务(由多个指令节点组成,支持条件分支) */ @Data public class Transaction { @@ -22,25 +25,98 @@ public class Transaction { private CommandType commandType; /** - * 指令列表(按顺序执行) + * 指令节点映射(nodeId -> InstructionNode) */ - private List instructions = new ArrayList<>(); + private Map nodeMap = new HashMap<>(); + + /** + * 起始节点ID + */ + private String startNodeId; /** * 事务超时时间(毫秒) */ private long timeoutMs = 120000; // 默认2分钟 + /** + * 指令列表(按顺序执行)- 兼容旧版本 + * @deprecated 使用 nodeMap 和条件分支代替 + */ + @Deprecated + private List instructions = new ArrayList<>(); + public Transaction(String name, CommandType commandType) { this.name = name; this.commandType = commandType; } /** - * 添加指令 + * 添加指令节点 */ + public Transaction addNode(InstructionNode node) { + nodeMap.put(node.getNodeId(), node); + // 如果是第一个节点,设置为起始节点 + if (startNodeId == null) { + startNodeId = node.getNodeId(); + } + return this; + } + + /** + * 添加指令节点(简化版) + */ + public Transaction addNode(String nodeId, Instruction instruction) { + return addNode(new InstructionNode(nodeId, instruction)); + } + + /** + * 设置起始节点 + */ + public Transaction setStartNode(String nodeId) { + if (!nodeMap.containsKey(nodeId)) { + throw new IllegalArgumentException("节点不存在: " + nodeId); + } + this.startNodeId = nodeId; + return this; + } + + /** + * 获取指令节点 + */ + public InstructionNode getNode(String nodeId) { + return nodeMap.get(nodeId); + } + + /** + * 获取起始节点 + */ + public InstructionNode getStartNode() { + return startNodeId != null ? nodeMap.get(startNodeId) : null; + } + + /** + * 添加指令(兼容旧版本,自动转换为线性节点链) + * @deprecated 使用 addNode 和条件分支代替 + */ + @Deprecated public Transaction addInstruction(Instruction instruction) { this.instructions.add(instruction); + + // 自动转换为节点 + String nodeId = "node_" + instructions.size(); + InstructionNode node = new InstructionNode(nodeId, instruction); + + // 如果不是第一个节点,将上一个节点的 always 指向当前节点 + if (instructions.size() > 1) { + String prevNodeId = "node_" + (instructions.size() - 1); + InstructionNode prevNode = nodeMap.get(prevNodeId); + if (prevNode != null) { + prevNode.always(nodeId); + } + } + + addNode(node); return this; } @@ -51,4 +127,18 @@ public class Transaction { this.timeoutMs = timeoutMs; return this; } + + /** + * 判断是否使用图结构(新版本) + */ + public boolean isGraphMode() { + return !nodeMap.isEmpty(); + } + + /** + * 判断是否使用线性结构(旧版本) + */ + public boolean isLinearMode() { + return !instructions.isEmpty() && nodeMap.isEmpty(); + } } diff --git a/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java b/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java index c138df9..7ee8ace 100644 --- a/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java +++ b/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * 事务执行器 + * 事务执行器(支持条件分支) */ @Slf4j @Component @@ -34,20 +34,90 @@ public class TransactionExecutor { // 在新线程中执行事务 CompletableFuture.runAsync(() -> { try { - // 依次执行每个指令 - for (Instruction instruction : transaction.getInstructions()) { - // 检查事务是否超时 - if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { - log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); - future.complete(CommandResult.timeout(transaction.getCommandType())); - return; + // 判断使用图模式还是线性模式 + if (transaction.isGraphMode()) { + // 新版本:图结构执行(支持条件分支) + executeGraphTransaction(transaction, context, startTime, future); + } else { + // 旧版本:线性执行(兼容旧代码) + executeLinearTransaction(transaction, context, startTime, future); + } + } catch (Exception e) { + log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e); + future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage())); + } + }); + + return future; + } + + /** + * 执行图结构事务(新版本,支持条件分支) + */ + private void executeGraphTransaction(Transaction transaction, InstructionContext context, + long startTime, CompletableFuture future) { + // 从起始节点开始执行 + InstructionNode currentNode = transaction.getStartNode(); + if (currentNode == null) { + log.error("事务没有起始节点: transaction={}", transaction.getName()); + future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有起始节点")); + return; + } + + // 循环执行节点,直到没有下一个节点 + while (currentNode != null) { + // 检查事务是否超时 + if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { + log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); + future.complete(CommandResult.timeout(transaction.getCommandType())); + return; + } + + Instruction instruction = currentNode.getInstruction(); + log.debug("执行指令节点: nodeId={}, instruction={}", currentNode.getNodeId(), instruction.getName()); + + // 执行指令 + InstructionResult result = executeInstruction(instruction, context); + + // 判断是否需要根据状态回调结果决定下一步 + boolean shouldBranch = shouldBranchByStateCallback(instruction, context); + + if (shouldBranch) { + // 根据状态回调结果决定下一步 + String nextNodeId = currentNode.getNextNodeId(result.isSuccess()); + + if (nextNodeId == null) { + // 找不到下一步指令,将状态回调结果作为事务结果返回 + log.info("状态回调完成,无下一步指令,事务结束: success={}, nodeId={}", + result.isSuccess(), currentNode.getNodeId()); + + if (result.isSuccess()) { + future.complete(CommandResult.success(transaction.getCommandType(), result.getData())); + } else { + future.complete(CommandResult.failure( + transaction.getCommandType(), + result.getErrorMessage(), + instruction.getName() + )); } + return; + } - // 执行指令 - InstructionResult result = executeInstruction(instruction, context); + // 获取下一个节点 + currentNode = transaction.getNode(nextNodeId); + log.debug("根据状态回调结果选择下一个节点: success={}, nextNodeId={}", + result.isSuccess(), nextNodeId); - // 如果指令失败,终止事务 - if (!result.isSuccess()) { + } else { + // 不需要分支,按照传统逻辑处理 + if (!result.isSuccess()) { + // 指令失败,检查是否有失败分支 + String nextNodeId = currentNode.getNextNodeId(false); + if (nextNodeId != null) { + currentNode = transaction.getNode(nextNodeId); + log.debug("指令失败,跳转到失败分支: nextNodeId={}", nextNodeId); + } else { + // 没有失败分支,终止事务 log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage()); future.complete(CommandResult.failure( transaction.getCommandType(), @@ -56,21 +126,70 @@ public class TransactionExecutor { )); return; } - - log.debug("指令执行成功: instruction={}", instruction.getName()); + } else { + // 指令成功,获取下一个节点 + String nextNodeId = currentNode.getNextNodeId(true); + if (nextNodeId != null) { + currentNode = transaction.getNode(nextNodeId); + log.debug("指令成功,继续下一个节点: nextNodeId={}", nextNodeId); + } else { + // 没有下一个节点,事务成功完成 + log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); + future.complete(CommandResult.success(transaction.getCommandType())); + return; + } } - - // 所有指令执行成功 - log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); - future.complete(CommandResult.success(transaction.getCommandType())); - - } catch (Exception e) { - log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e); - future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage())); } - }); + } - return future; + // 正常结束(虽然理论上不会走到这里) + log.info("事务执行完成: transaction={}, sn={}", transaction.getName(), context.getSn()); + future.complete(CommandResult.success(transaction.getCommandType())); + } + + /** + * 执行线性事务(旧版本,兼容旧代码) + */ + private void executeLinearTransaction(Transaction transaction, InstructionContext context, + long startTime, CompletableFuture future) { + // 依次执行每个指令 + for (Instruction instruction : transaction.getInstructions()) { + // 检查事务是否超时 + if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { + log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); + future.complete(CommandResult.timeout(transaction.getCommandType())); + return; + } + + // 执行指令 + InstructionResult result = executeInstruction(instruction, context); + + // 如果指令失败,终止事务 + if (!result.isSuccess()) { + log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage()); + future.complete(CommandResult.failure( + transaction.getCommandType(), + result.getErrorMessage(), + instruction.getName() + )); + return; + } + + log.debug("指令执行成功: instruction={}", instruction.getName()); + } + + // 所有指令执行成功 + log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); + future.complete(CommandResult.success(transaction.getCommandType())); + } + + /** + * 判断是否需要根据状态回调结果决定下一步 + * 条件:配置了 stateCallback 且 canShortCircuit 为 false + */ + private boolean shouldBranchByStateCallback(Instruction instruction, InstructionContext context) { + CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); + return stateCallback != null && !stateCallback.isCanShortCircuit(); } /** @@ -107,10 +226,9 @@ public class TransactionExecutor { CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); if (stateCallback != null && !stateCallback.isCanShortCircuit()) { InstructionResult stateResult = waitForCallback(stateCallback, context); - if (!stateResult.isSuccess()) { - instruction.onComplete(context, stateResult); - return stateResult; - } + // 注意:这里不立即返回,而是将结果传递给上层,由上层决定下一步 + instruction.onComplete(context, stateResult); + return stateResult; } // 指令执行成功 @@ -119,7 +237,7 @@ public class TransactionExecutor { return result; } catch (Exception e) { - log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e); + log.error("指令执行异常: instruction={}, sn=", instruction.getName(), context.getSn(), e); InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage()); instruction.onComplete(context, result); return result; diff --git a/src/main/java/com/tuoheng/machine/example/ConditionalBranchExample.java b/src/main/java/com/tuoheng/machine/example/ConditionalBranchExample.java new file mode 100644 index 0000000..6620f4a --- /dev/null +++ b/src/main/java/com/tuoheng/machine/example/ConditionalBranchExample.java @@ -0,0 +1,433 @@ +package com.tuoheng.machine.example; + +import com.tuoheng.machine.command.CommandType; +import com.tuoheng.machine.command.Transaction; +import com.tuoheng.machine.instruction.AbstractInstruction; +import com.tuoheng.machine.instruction.CallbackConfig; +import com.tuoheng.machine.instruction.InstructionContext; +import com.tuoheng.machine.instruction.InstructionNode; +import lombok.extern.slf4j.Slf4j; + +/** + * 条件分支执行示例 + * + * 演示如何使用新的图结构和条件分支功能 + */ +@Slf4j +public class ConditionalBranchExample { + + /** + * 示例1:简单的成功/失败分支 + * + * 流程: + * 1. 检查设备状态 + * 2. 如果状态正常 -> 执行起飞 + * 3. 如果状态异常 -> 执行修复操作 + */ + public Transaction example1_SimpleConditionalBranch() { + Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF); + + // 节点1:检查设备状态 + InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction()) + .onSuccess("takeoff") // 成功 -> 起飞 + .onFailure("repair"); // 失败 -> 修复 + + // 节点2:起飞指令 + InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction()); + // 起飞后没有下一步,事务结束 + + // 节点3:修复指令 + InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction()) + .always("takeoff"); // 修复后 -> 起飞 + + // 添加所有节点 + transaction.addNode(checkNode) + .addNode(takeoffNode) + .addNode(repairNode) + .setStartNode("check"); + + return transaction; + } + + /** + * 示例2:基于状态回调结果的分支 + * + * 流程: + * 1. 发送起飞指令 + * 2. 等待状态回调(stateCallback,canShortCircuit=false) + * 3. 如果起飞成功 -> 开始任务 + * 4. 如果起飞失败 -> 重试起飞 + */ + public Transaction example2_StateCallbackBranch() { + Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF); + + // 节点1:起飞指令(配置了状态回调) + InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction()) + .onSuccess("start_mission") // 状态回调成功 -> 开始任务 + .onFailure("retry_takeoff"); // 状态回调失败 -> 重试 + + // 节点2:开始任务 + InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction()); + + // 节点3:重试起飞 + InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction()) + .onSuccess("start_mission") // 重试成功 -> 开始任务 + .onFailure("emergency_land"); // 重试失败 -> 紧急降落 + + // 节点4:紧急降落 + InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction()); + + transaction.addNode(takeoffNode) + .addNode(startMissionNode) + .addNode(retryNode) + .addNode(emergencyLandNode) + .setStartNode("takeoff"); + + return transaction; + } + + /** + * 示例3:复杂的多分支流程 + * + * 流程: + * 1. 打开舱门 + * 2. 检查天气 + * 3. 如果天气好 -> 起飞 -> 执行任务 + * 4. 如果天气差 -> 等待 -> 重新检查天气 + * 5. 如果天气极差 -> 关闭舱门 -> 取消任务 + */ + public Transaction example3_ComplexBranch() { + Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION); + + // 节点1:打开舱门 + InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction()) + .always("check_weather"); + + // 节点2:检查天气 + InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction()) + .onSuccess("takeoff") // 天气好 -> 起飞 + .onFailure("handle_bad_weather"); // 天气差 -> 处理 + + // 节点3:起飞 + InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction()) + .onSuccess("execute_mission") // 起飞成功 -> 执行任务 + .onFailure("close_cover"); // 起飞失败 -> 关闭舱门 + + // 节点4:执行任务 + InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction()); + + // 节点5:处理恶劣天气 + InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction()) + .onSuccess("wait_and_retry") // 可以等待 -> 等待后重试 + .onFailure("close_cover"); // 天气极差 -> 关闭舱门 + + // 节点6:等待并重试 + InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction()) + .always("check_weather"); // 等待后 -> 重新检查天气 + + // 节点7:关闭舱门 + InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction()); + + transaction.addNode(openCoverNode) + .addNode(checkWeatherNode) + .addNode(takeoffNode) + .addNode(executeMissionNode) + .addNode(handleBadWeatherNode) + .addNode(waitAndRetryNode) + .addNode(closeCoverNode) + .setStartNode("open_cover") + .setTimeout(300000); // 5分钟超时 + + return transaction; + } + + /** + * 示例4:状态回调结果直接作为事务结果 + * + * 流程: + * 1. 发送起飞指令 + * 2. 等待状态回调(canShortCircuit=false) + * 3. 没有配置下一步节点 + * 4. 状态回调的结果直接作为事务结果返回 + */ + public Transaction example4_StateCallbackAsResult() { + Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF); + + // 只有一个节点,配置了状态回调,但没有配置下一步 + InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction()); + // 注意:没有配置 onSuccess 或 onFailure + // 状态回调的结果将直接作为事务结果返回 + + transaction.addNode(takeoffNode) + .setStartNode("takeoff"); + + return transaction; + } + + // ==================== 示例指令实现 ==================== + + /** + * 检查设备状态指令 + */ + static class CheckDeviceInstruction extends AbstractInstruction { + @Override + public String getName() { + return "CHECK_DEVICE"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("检查设备状态: sn={}", context.getSn()); + // 发送MQTT消息检查设备状态 + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/status") + .fieldPath("status") + .expectedValue("OK") + .canShortCircuit(false) // 必须等待状态回调 + .timeoutMs(10000) + .build(); + } + } + + /** + * 起飞指令(带状态回调) + */ + static class TakeOffWithStateCallbackInstruction extends AbstractInstruction { + @Override + public String getName() { + return "TAKE_OFF_WITH_STATE"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("发送起飞指令: sn={}", context.getSn()); + // 发送MQTT起飞指令 + } + + @Override + public CallbackConfig getMethodCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/response") + .fieldPath("cmd") + .expectedValue("takeoff") + .canShortCircuit(false) + .timeoutMs(10000) + .build(); + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + // 关键:canShortCircuit=false,表示必须根据状态回调结果决定下一步 + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/state") + .fieldPath("droneState") + .expectedValue("FLYING") + .canShortCircuit(false) // 必须等待状态回调,并根据结果决定下一步 + .timeoutMs(60000) + .build(); + } + } + + /** + * 简单起飞指令 + */ + static class TakeOffInstruction extends AbstractInstruction { + @Override + public String getName() { + return "TAKE_OFF"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("起飞: sn=", context.getSn()); + } + } + + /** + * 修复设备指令 + */ + static class RepairDeviceInstruction extends AbstractInstruction { + @Override + public String getName() { + return "REPAIR_DEVICE"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("修复设备: sn={}", context.getSn()); + } + } + + /** + * 开始任务指令 + */ + static class StartMissionInstruction extends AbstractInstruction { + @Override + public String getName() { + return "START_MISSION"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("开始任务: sn={}", context.getSn()); + } + } + + /** + * 重试起飞指令 + */ + static class RetryTakeOffInstruction extends AbstractInstruction { + @Override + public String getName() { + return "RETRY_TAKE_OFF"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("重试起飞: sn={}", context.getSn()); + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("device/" + context.getSn() + "/state") + .fieldPath("droneState") + .expectedValue("FLYING") + .canShortCircuit(false) + .timeoutMs(60000) + .build(); + } + } + + /** + * 紧急降落指令 + */ + static class EmergencyLandInstruction extends AbstractInstruction { + @Override + public String getName() { + return "EMERGENCY_LAND"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("紧急降落: sn={}", context.getSn()); + } + } + + /** + * 打开舱门指令 + */ + static class OpenCoverInstruction extends AbstractInstruction { + @Override + public String getName() { + return "OPEN_COVER"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("打开舱门: sn={}", context.getSn()); + } + } + + /** + * 关闭舱门指令 + */ + static class CloseCoverInstruction extends AbstractInstruction { + @Override + public String getName() { + return "CLOSE_COVER"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("关闭舱门: sn={}", context.getSn()); + } + } + + /** + * 检查天气指令 + */ + static class CheckWeatherInstruction extends AbstractInstruction { + @Override + public String getName() { + return "CHECK_WEATHER"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("检查天气: sn={}", context.getSn()); + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("weather/status") + .fieldPath("condition") + .expectedValue("GOOD") + .canShortCircuit(false) + .timeoutMs(10000) + .build(); + } + } + + /** + * 执行任务指令 + */ + static class ExecuteMissionInstruction extends AbstractInstruction { + @Override + public String getName() { + return "EXECUTE_MISSION"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("执行任务: sn={}", context.getSn()); + } + } + + /** + * 处理恶劣天气指令 + */ + static class HandleBadWeatherInstruction extends AbstractInstruction { + @Override + public String getName() { + return "HANDLE_BAD_WEATHER"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("处理恶劣天气: sn={}", context.getSn()); + } + + @Override + public CallbackConfig getStateCallbackConfig(InstructionContext context) { + return CallbackConfig.builder() + .topic("weather/status") + .fieldPath("severity") + .customPredicate(severity -> !"EXTREME".equals(severity)) + .canShortCircuit(false) + .timeoutMs(5000) + .build(); + } + } + + /** + * 等待指令 + */ + static class WaitInstruction extends AbstractInstruction { + @Override + public String getName() { + return "WAIT"; + } + + @Override + public void executeRemoteCall(InstructionContext context) throws Exception { + log.info("等待中: sn={}", context.getSn()); + Thread.sleep(30000); // 等待30秒 + } + } +} diff --git a/src/main/java/com/tuoheng/machine/instruction/InstructionNode.java b/src/main/java/com/tuoheng/machine/instruction/InstructionNode.java new file mode 100644 index 0000000..9641927 --- /dev/null +++ b/src/main/java/com/tuoheng/machine/instruction/InstructionNode.java @@ -0,0 +1,90 @@ +package com.tuoheng.machine.instruction; + +import lombok.Data; + +/** + * 指令节点(支持条件分支) + */ +@Data +public class InstructionNode { + /** + * 指令实例 + */ + private Instruction instruction; + + /** + * 成功后执行的下一个指令节点ID + */ + private String onSuccessNodeId; + + /** + * 失败后执行的下一个指令节点ID + */ + private String onFailureNodeId; + + /** + * 无论成功失败都执行的下一个指令节点ID(优先级最高) + */ + private String alwaysNextNodeId; + + /** + * 节点ID(唯一标识) + */ + private String nodeId; + + public InstructionNode(String nodeId, Instruction instruction) { + this.nodeId = nodeId; + this.instruction = instruction; + } + + /** + * 设置无论成功失败都执行的下一个节点 + */ + public InstructionNode always(String nextNodeId) { + this.alwaysNextNodeId = nextNodeId; + return this; + } + + /** + * 设置成功后执行的下一个节点 + */ + public InstructionNode onSuccess(String nextNodeId) { + this.onSuccessNodeId = nextNodeId; + return this; + } + + /** + * 设置失败后执行的下一个节点 + */ + public InstructionNode onFailure(String nextNodeId) { + this.onFailureNodeId = nextNodeId; + return this; + } + + /** + * 根据执行结果获取下一个节点ID + * + * @param success 是否成功 + * @return 下一个节点ID,如果没有则返回null + */ + public String getNextNodeId(boolean success) { + // 优先返回 always 配置 + if (alwaysNextNodeId != null) { + return alwaysNextNodeId; + } + + // 根据成功失败返回对应的节点 + if (success) { + return onSuccessNodeId; + } else { + return onFailureNodeId; + } + } + + /** + * 是否配置了下一个节点 + */ + public boolean hasNextNode(boolean success) { + return getNextNodeId(success) != null; + } +}