修改指令编排的方法

This commit is contained in:
孙小云 2025-12-17 11:16:09 +08:00
parent d6559bbe58
commit 8f518bca79
5 changed files with 1128 additions and 33 deletions

View File

@ -0,0 +1,364 @@
# 条件分支执行指南
## 概述
框架现在支持基于状态回调结果的条件分支执行。这意味着你可以根据指令的执行结果(成功/失败)动态决定下一步执行哪个指令。
## 核心概念
### 1. InstructionNode指令节点
指令节点是事务执行的基本单元,支持配置三种类型的后续节点:
```java
InstructionNode node = new InstructionNode("nodeId", instruction)
.always("nextNodeId") // 无论成功失败都执行
.onSuccess("successNodeId") // 成功时执行
.onFailure("failureNodeId"); // 失败时执行
```
**优先级**`always` > `onSuccess/onFailure`
### 2. 条件分支触发条件
只有当指令满足以下条件时,才会根据状态回调结果进行分支:
1. **配置了 `stateCallback`**(状态回调配置)
2. **`canShortCircuit = false`**(不可短路,必须等待状态回调)
```java
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/state")
.fieldPath("droneState")
.expectedValue("FLYING")
.canShortCircuit(false) // 关键:必须为 false
.timeoutMs(60000)
.build();
}
```
### 3. 分支决策逻辑
当指令配置了状态回调且 `canShortCircuit=false` 时:
1. **状态回调成功** → 执行 `onSuccess` 配置的节点
2. **状态回调失败/超时** → 执行 `onFailure` 配置的节点
3. **如果找不到对应的下一步节点** → 将状态回调结果作为事务结果返回
## 使用场景
### 场景1简单的成功/失败分支
```java
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF);
// 检查设备状态
InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction())
.onSuccess("takeoff") // 状态正常 -> 起飞
.onFailure("repair"); // 状态异常 -> 修复
// 起飞
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction());
// 修复后再起飞
InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction())
.always("takeoff");
transaction.addNode(checkNode)
.addNode(takeoffNode)
.addNode(repairNode)
.setStartNode("check");
```
**执行流程**
```
check (检查设备)
├─ 成功 → takeoff (起飞) → 结束
└─ 失败 → repair (修复) → takeoff (起飞) → 结束
```
### 场景2重试机制
```java
Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF);
// 起飞(带状态回调)
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction())
.onSuccess("start_mission") // 起飞成功 -> 开始任务
.onFailure("retry_takeoff"); // 起飞失败 -> 重试
// 开始任务
InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction());
// 重试起飞
InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction())
.onSuccess("start_mission") // 重试成功 -> 开始任务
.onFailure("emergency_land"); // 重试失败 -> 紧急降落
// 紧急降落
InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction());
transaction.addNode(takeoffNode)
.addNode(startMissionNode)
.addNode(retryNode)
.addNode(emergencyLandNode)
.setStartNode("takeoff");
```
**执行流程**
```
takeoff (起飞)
├─ 成功 → start_mission (开始任务) → 结束
└─ 失败 → retry_takeoff (重试)
├─ 成功 → start_mission (开始任务) → 结束
└─ 失败 → emergency_land (紧急降落) → 结束
```
### 场景3状态回调结果直接作为事务结果
```java
Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF);
// 只有一个节点,没有配置下一步
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction());
// 注意:没有配置 onSuccess 或 onFailure
transaction.addNode(takeoffNode)
.setStartNode("takeoff");
```
**执行流程**
```
takeoff (起飞,等待状态回调)
├─ 状态回调成功 → 事务成功(没有下一步节点)
└─ 状态回调失败 → 事务失败(没有下一步节点)
```
### 场景4复杂的多分支流程
```java
Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION);
// 1. 打开舱门
InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction())
.always("check_weather");
// 2. 检查天气
InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction())
.onSuccess("takeoff") // 天气好 -> 起飞
.onFailure("handle_bad_weather"); // 天气差 -> 处理
// 3. 起飞
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction())
.onSuccess("execute_mission") // 起飞成功 -> 执行任务
.onFailure("close_cover"); // 起飞失败 -> 关闭舱门
// 4. 执行任务
InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction());
// 5. 处理恶劣天气
InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction())
.onSuccess("wait_and_retry") // 可以等待 -> 等待后重试
.onFailure("close_cover"); // 天气极差 -> 关闭舱门
// 6. 等待并重试
InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction())
.always("check_weather"); // 等待后 -> 重新检查天气
// 7. 关闭舱门
InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction());
transaction.addNode(openCoverNode)
.addNode(checkWeatherNode)
.addNode(takeoffNode)
.addNode(executeMissionNode)
.addNode(handleBadWeatherNode)
.addNode(waitAndRetryNode)
.addNode(closeCoverNode)
.setStartNode("open_cover")
.setTimeout(300000);
```
**执行流程**
```
open_cover (打开舱门)
→ check_weather (检查天气)
├─ 天气好 → takeoff (起飞)
│ ├─ 成功 → execute_mission (执行任务) → 结束
│ └─ 失败 → close_cover (关闭舱门) → 结束
└─ 天气差 → handle_bad_weather (处理恶劣天气)
├─ 可等待 → wait_and_retry (等待) → check_weather (重新检查)
└─ 极差 → close_cover (关闭舱门) → 结束
```
## 指令实现要点
### 1. 配置状态回调以支持条件分支
```java
public class MyInstruction extends AbstractInstruction {
@Override
public String getName() {
return "MY_INSTRUCTION";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
// 发送MQTT指令
String sn = context.getSn();
mqttClient.publish("device/" + sn + "/command", "{\"cmd\":\"myCommand\"}");
}
@Override
public CallbackConfig getMethodCallbackConfig(InstructionContext context) {
// 方法回调等待指令ACK
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/response")
.fieldPath("cmd")
.expectedValue("myCommand")
.canShortCircuit(false)
.timeoutMs(10000)
.build();
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
// 状态回调:等待状态变化
// 关键canShortCircuit=false表示必须根据状态回调结果决定下一步
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/state")
.fieldPath("status")
.expectedValue("SUCCESS")
.canShortCircuit(false) // 必须为 false
.timeoutMs(60000)
.build();
}
}
```
### 2. 使用自定义判断逻辑
```java
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/state")
.customPredicate(messageBody -> {
// 自定义判断逻辑
Map<String, Object> data = (Map<String, Object>) messageBody;
String status = (String) data.get("status");
Integer errorCode = (Integer) data.get("errorCode");
// 只有状态为SUCCESS且错误码为0才算成功
return "SUCCESS".equals(status) && errorCode == 0;
})
.canShortCircuit(false)
.timeoutMs(60000)
.build();
}
```
## 与旧版本的兼容性
框架完全兼容旧版本的线性指令链:
```java
// 旧版本写法(仍然支持)
Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF)
.addInstruction(new Instruction1())
.addInstruction(new Instruction2())
.addInstruction(new Instruction3());
// 新版本写法(支持条件分支)
Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF)
.addNode(new InstructionNode("node1", new Instruction1()).always("node2"))
.addNode(new InstructionNode("node2", new Instruction2()).always("node3"))
.addNode(new InstructionNode("node3", new Instruction3()));
```
旧版本的 `addInstruction` 方法会自动转换为节点链,所有现有代码无需修改。
## 关键规则总结
1. **条件分支触发条件**
- 必须配置 `stateCallback`
- `canShortCircuit` 必须为 `false`
2. **分支优先级**
- `always` > `onSuccess/onFailure`
- 如果配置了 `always`,则无论成功失败都执行该节点
3. **找不到下一步节点时**
- 如果指令配置了状态回调且 `canShortCircuit=false`
- 但找不到对应的下一步节点(`onSuccess` 或 `onFailure`
- 则状态回调的结果直接作为事务结果返回
4. **超时处理**
- 状态回调超时视为失败
- 会执行 `onFailure` 配置的节点
- 如果没有配置 `onFailure`,则事务失败
5. **循环检测**
- 框架不会自动检测循环
- 请确保事务超时时间足够长
- 避免无限循环(如 A → B → A
## 调试技巧
1. **查看执行日志**
```
执行指令节点: nodeId=check, instruction=CHECK_DEVICE
根据状态回调结果选择下一个节点: success=true, nextNodeId=takeoff
执行指令节点: nodeId=takeoff, instruction=TAKE_OFF
状态回调完成,无下一步指令,事务结束: success=true, nodeId=takeoff
```
2. **使用 `onComplete` 回调记录指令执行结果**
```java
@Override
public void onComplete(InstructionContext context, InstructionResult result) {
log.info("指令执行完成: instruction={}, success={}, error={}",
getName(), result.isSuccess(), result.getErrorMessage());
}
```
3. **注册命令执行监听器**
```java
commandManager.registerCommandListener("debug-listener", (sn, result) -> {
log.info("命令执行完成: sn={}, commandType={}, success={}, failedInstruction={}",
sn, result.getCommandType(), result.isSuccess(), result.getFailedInstructionName());
});
```
## 最佳实践
1. **合理设置超时时间**
- 指令超时 < 事务超时
- 考虑重试次数和等待时间
2. **避免过深的分支嵌套**
- 建议分支深度不超过3层
- 复杂流程可以拆分为多个事务
3. **提供失败分支**
- 关键指令应该配置 `onFailure` 分支
- 避免因为一个指令失败导致整个事务失败
4. **使用有意义的节点ID**
- 使用描述性的节点ID便于调试
- 例如:`check_weather`、`takeoff`、`retry_takeoff`
5. **记录状态变化**
- 在指令的 `onComplete` 中记录执行结果
- 便于排查问题和优化流程
## 完整示例
参考 `ConditionalBranchExample.java` 文件包含4个完整的示例
- 示例1简单的成功/失败分支
- 示例2基于状态回调结果的分支
- 示例3复杂的多分支流程
- 示例4状态回调结果直接作为事务结果

View File

@ -1,13 +1,16 @@
package com.tuoheng.machine.command;
import com.tuoheng.machine.instruction.Instruction;
import com.tuoheng.machine.instruction.InstructionNode;
import lombok.Data;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 事务由多个指令组成
* 事务由多个指令节点组成支持条件分支
*/
@Data
public class Transaction {
@ -22,25 +25,98 @@ public class Transaction {
private CommandType commandType;
/**
* 指令列表按顺序执行
* 指令节点映射nodeId -> InstructionNode
*/
private List<Instruction> instructions = new ArrayList<>();
private Map<String, InstructionNode> nodeMap = new HashMap<>();
/**
* 起始节点ID
*/
private String startNodeId;
/**
* 事务超时时间毫秒
*/
private long timeoutMs = 120000; // 默认2分钟
/**
* 指令列表按顺序执行- 兼容旧版本
* @deprecated 使用 nodeMap 和条件分支代替
*/
@Deprecated
private List<Instruction> instructions = new ArrayList<>();
public Transaction(String name, CommandType commandType) {
this.name = name;
this.commandType = commandType;
}
/**
* 添加指令
* 添加指令节点
*/
public Transaction addNode(InstructionNode node) {
nodeMap.put(node.getNodeId(), node);
// 如果是第一个节点设置为起始节点
if (startNodeId == null) {
startNodeId = node.getNodeId();
}
return this;
}
/**
* 添加指令节点简化版
*/
public Transaction addNode(String nodeId, Instruction instruction) {
return addNode(new InstructionNode(nodeId, instruction));
}
/**
* 设置起始节点
*/
public Transaction setStartNode(String nodeId) {
if (!nodeMap.containsKey(nodeId)) {
throw new IllegalArgumentException("节点不存在: " + nodeId);
}
this.startNodeId = nodeId;
return this;
}
/**
* 获取指令节点
*/
public InstructionNode getNode(String nodeId) {
return nodeMap.get(nodeId);
}
/**
* 获取起始节点
*/
public InstructionNode getStartNode() {
return startNodeId != null ? nodeMap.get(startNodeId) : null;
}
/**
* 添加指令兼容旧版本自动转换为线性节点链
* @deprecated 使用 addNode 和条件分支代替
*/
@Deprecated
public Transaction addInstruction(Instruction instruction) {
this.instructions.add(instruction);
// 自动转换为节点
String nodeId = "node_" + instructions.size();
InstructionNode node = new InstructionNode(nodeId, instruction);
// 如果不是第一个节点将上一个节点的 always 指向当前节点
if (instructions.size() > 1) {
String prevNodeId = "node_" + (instructions.size() - 1);
InstructionNode prevNode = nodeMap.get(prevNodeId);
if (prevNode != null) {
prevNode.always(nodeId);
}
}
addNode(node);
return this;
}
@ -51,4 +127,18 @@ public class Transaction {
this.timeoutMs = timeoutMs;
return this;
}
/**
* 判断是否使用图结构新版本
*/
public boolean isGraphMode() {
return !nodeMap.isEmpty();
}
/**
* 判断是否使用线性结构旧版本
*/
public boolean isLinearMode() {
return !instructions.isEmpty() && nodeMap.isEmpty();
}
}

View File

@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 事务执行器
* 事务执行器支持条件分支
*/
@Slf4j
@Component
@ -34,20 +34,90 @@ public class TransactionExecutor {
// 在新线程中执行事务
CompletableFuture.runAsync(() -> {
try {
// 依次执行每个指令
for (Instruction instruction : transaction.getInstructions()) {
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.timeout(transaction.getCommandType()));
return;
// 判断使用图模式还是线性模式
if (transaction.isGraphMode()) {
// 新版本图结构执行支持条件分支
executeGraphTransaction(transaction, context, startTime, future);
} else {
// 旧版本线性执行兼容旧代码
executeLinearTransaction(transaction, context, startTime, future);
}
} catch (Exception e) {
log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e);
future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage()));
}
});
return future;
}
/**
* 执行图结构事务新版本支持条件分支
*/
private void executeGraphTransaction(Transaction transaction, InstructionContext context,
long startTime, CompletableFuture<CommandResult> future) {
// 从起始节点开始执行
InstructionNode currentNode = transaction.getStartNode();
if (currentNode == null) {
log.error("事务没有起始节点: transaction={}", transaction.getName());
future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有起始节点"));
return;
}
// 循环执行节点直到没有下一个节点
while (currentNode != null) {
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.timeout(transaction.getCommandType()));
return;
}
Instruction instruction = currentNode.getInstruction();
log.debug("执行指令节点: nodeId={}, instruction={}", currentNode.getNodeId(), instruction.getName());
// 执行指令
InstructionResult result = executeInstruction(instruction, context);
// 判断是否需要根据状态回调结果决定下一步
boolean shouldBranch = shouldBranchByStateCallback(instruction, context);
if (shouldBranch) {
// 根据状态回调结果决定下一步
String nextNodeId = currentNode.getNextNodeId(result.isSuccess());
if (nextNodeId == null) {
// 找不到下一步指令将状态回调结果作为事务结果返回
log.info("状态回调完成,无下一步指令,事务结束: success={}, nodeId={}",
result.isSuccess(), currentNode.getNodeId());
if (result.isSuccess()) {
future.complete(CommandResult.success(transaction.getCommandType(), result.getData()));
} else {
future.complete(CommandResult.failure(
transaction.getCommandType(),
result.getErrorMessage(),
instruction.getName()
));
}
return;
}
// 执行指令
InstructionResult result = executeInstruction(instruction, context);
// 获取下一个节点
currentNode = transaction.getNode(nextNodeId);
log.debug("根据状态回调结果选择下一个节点: success={}, nextNodeId={}",
result.isSuccess(), nextNodeId);
// 如果指令失败终止事务
if (!result.isSuccess()) {
} else {
// 不需要分支按照传统逻辑处理
if (!result.isSuccess()) {
// 指令失败检查是否有失败分支
String nextNodeId = currentNode.getNextNodeId(false);
if (nextNodeId != null) {
currentNode = transaction.getNode(nextNodeId);
log.debug("指令失败,跳转到失败分支: nextNodeId={}", nextNodeId);
} else {
// 没有失败分支终止事务
log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage());
future.complete(CommandResult.failure(
transaction.getCommandType(),
@ -56,21 +126,70 @@ public class TransactionExecutor {
));
return;
}
log.debug("指令执行成功: instruction={}", instruction.getName());
} else {
// 指令成功获取下一个节点
String nextNodeId = currentNode.getNextNodeId(true);
if (nextNodeId != null) {
currentNode = transaction.getNode(nextNodeId);
log.debug("指令成功,继续下一个节点: nextNodeId={}", nextNodeId);
} else {
// 没有下一个节点事务成功完成
log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.success(transaction.getCommandType()));
return;
}
}
// 所有指令执行成功
log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.success(transaction.getCommandType()));
} catch (Exception e) {
log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e);
future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage()));
}
});
}
return future;
// 正常结束虽然理论上不会走到这里
log.info("事务执行完成: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.success(transaction.getCommandType()));
}
/**
* 执行线性事务旧版本兼容旧代码
*/
private void executeLinearTransaction(Transaction transaction, InstructionContext context,
long startTime, CompletableFuture<CommandResult> future) {
// 依次执行每个指令
for (Instruction instruction : transaction.getInstructions()) {
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.timeout(transaction.getCommandType()));
return;
}
// 执行指令
InstructionResult result = executeInstruction(instruction, context);
// 如果指令失败终止事务
if (!result.isSuccess()) {
log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage());
future.complete(CommandResult.failure(
transaction.getCommandType(),
result.getErrorMessage(),
instruction.getName()
));
return;
}
log.debug("指令执行成功: instruction={}", instruction.getName());
}
// 所有指令执行成功
log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.success(transaction.getCommandType()));
}
/**
* 判断是否需要根据状态回调结果决定下一步
* 条件配置了 stateCallback canShortCircuit false
*/
private boolean shouldBranchByStateCallback(Instruction instruction, InstructionContext context) {
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
return stateCallback != null && !stateCallback.isCanShortCircuit();
}
/**
@ -107,10 +226,9 @@ public class TransactionExecutor {
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
if (stateCallback != null && !stateCallback.isCanShortCircuit()) {
InstructionResult stateResult = waitForCallback(stateCallback, context);
if (!stateResult.isSuccess()) {
instruction.onComplete(context, stateResult);
return stateResult;
}
// 注意这里不立即返回而是将结果传递给上层由上层决定下一步
instruction.onComplete(context, stateResult);
return stateResult;
}
// 指令执行成功
@ -119,7 +237,7 @@ public class TransactionExecutor {
return result;
} catch (Exception e) {
log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e);
log.error("指令执行异常: instruction={}, sn=", instruction.getName(), context.getSn(), e);
InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage());
instruction.onComplete(context, result);
return result;

View File

@ -0,0 +1,433 @@
package com.tuoheng.machine.example;
import com.tuoheng.machine.command.CommandType;
import com.tuoheng.machine.command.Transaction;
import com.tuoheng.machine.instruction.AbstractInstruction;
import com.tuoheng.machine.instruction.CallbackConfig;
import com.tuoheng.machine.instruction.InstructionContext;
import com.tuoheng.machine.instruction.InstructionNode;
import lombok.extern.slf4j.Slf4j;
/**
* 条件分支执行示例
*
* 演示如何使用新的图结构和条件分支功能
*/
@Slf4j
public class ConditionalBranchExample {
/**
* 示例1简单的成功/失败分支
*
* 流程
* 1. 检查设备状态
* 2. 如果状态正常 -> 执行起飞
* 3. 如果状态异常 -> 执行修复操作
*/
public Transaction example1_SimpleConditionalBranch() {
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF);
// 节点1检查设备状态
InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction())
.onSuccess("takeoff") // 成功 -> 起飞
.onFailure("repair"); // 失败 -> 修复
// 节点2起飞指令
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction());
// 起飞后没有下一步事务结束
// 节点3修复指令
InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction())
.always("takeoff"); // 修复后 -> 起飞
// 添加所有节点
transaction.addNode(checkNode)
.addNode(takeoffNode)
.addNode(repairNode)
.setStartNode("check");
return transaction;
}
/**
* 示例2基于状态回调结果的分支
*
* 流程
* 1. 发送起飞指令
* 2. 等待状态回调stateCallbackcanShortCircuit=false
* 3. 如果起飞成功 -> 开始任务
* 4. 如果起飞失败 -> 重试起飞
*/
public Transaction example2_StateCallbackBranch() {
Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF);
// 节点1起飞指令配置了状态回调
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction())
.onSuccess("start_mission") // 状态回调成功 -> 开始任务
.onFailure("retry_takeoff"); // 状态回调失败 -> 重试
// 节点2开始任务
InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction());
// 节点3重试起飞
InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction())
.onSuccess("start_mission") // 重试成功 -> 开始任务
.onFailure("emergency_land"); // 重试失败 -> 紧急降落
// 节点4紧急降落
InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction());
transaction.addNode(takeoffNode)
.addNode(startMissionNode)
.addNode(retryNode)
.addNode(emergencyLandNode)
.setStartNode("takeoff");
return transaction;
}
/**
* 示例3复杂的多分支流程
*
* 流程
* 1. 打开舱门
* 2. 检查天气
* 3. 如果天气好 -> 起飞 -> 执行任务
* 4. 如果天气差 -> 等待 -> 重新检查天气
* 5. 如果天气极差 -> 关闭舱门 -> 取消任务
*/
public Transaction example3_ComplexBranch() {
Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION);
// 节点1打开舱门
InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction())
.always("check_weather");
// 节点2检查天气
InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction())
.onSuccess("takeoff") // 天气好 -> 起飞
.onFailure("handle_bad_weather"); // 天气差 -> 处理
// 节点3起飞
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction())
.onSuccess("execute_mission") // 起飞成功 -> 执行任务
.onFailure("close_cover"); // 起飞失败 -> 关闭舱门
// 节点4执行任务
InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction());
// 节点5处理恶劣天气
InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction())
.onSuccess("wait_and_retry") // 可以等待 -> 等待后重试
.onFailure("close_cover"); // 天气极差 -> 关闭舱门
// 节点6等待并重试
InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction())
.always("check_weather"); // 等待后 -> 重新检查天气
// 节点7关闭舱门
InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction());
transaction.addNode(openCoverNode)
.addNode(checkWeatherNode)
.addNode(takeoffNode)
.addNode(executeMissionNode)
.addNode(handleBadWeatherNode)
.addNode(waitAndRetryNode)
.addNode(closeCoverNode)
.setStartNode("open_cover")
.setTimeout(300000); // 5分钟超时
return transaction;
}
/**
* 示例4状态回调结果直接作为事务结果
*
* 流程
* 1. 发送起飞指令
* 2. 等待状态回调canShortCircuit=false
* 3. 没有配置下一步节点
* 4. 状态回调的结果直接作为事务结果返回
*/
public Transaction example4_StateCallbackAsResult() {
Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF);
// 只有一个节点配置了状态回调但没有配置下一步
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction());
// 注意没有配置 onSuccess onFailure
// 状态回调的结果将直接作为事务结果返回
transaction.addNode(takeoffNode)
.setStartNode("takeoff");
return transaction;
}
// ==================== 示例指令实现 ====================
/**
* 检查设备状态指令
*/
static class CheckDeviceInstruction extends AbstractInstruction {
@Override
public String getName() {
return "CHECK_DEVICE";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("检查设备状态: sn={}", context.getSn());
// 发送MQTT消息检查设备状态
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/status")
.fieldPath("status")
.expectedValue("OK")
.canShortCircuit(false) // 必须等待状态回调
.timeoutMs(10000)
.build();
}
}
/**
* 起飞指令带状态回调
*/
static class TakeOffWithStateCallbackInstruction extends AbstractInstruction {
@Override
public String getName() {
return "TAKE_OFF_WITH_STATE";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("发送起飞指令: sn={}", context.getSn());
// 发送MQTT起飞指令
}
@Override
public CallbackConfig getMethodCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/response")
.fieldPath("cmd")
.expectedValue("takeoff")
.canShortCircuit(false)
.timeoutMs(10000)
.build();
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
// 关键canShortCircuit=false表示必须根据状态回调结果决定下一步
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/state")
.fieldPath("droneState")
.expectedValue("FLYING")
.canShortCircuit(false) // 必须等待状态回调并根据结果决定下一步
.timeoutMs(60000)
.build();
}
}
/**
* 简单起飞指令
*/
static class TakeOffInstruction extends AbstractInstruction {
@Override
public String getName() {
return "TAKE_OFF";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("起飞: sn=", context.getSn());
}
}
/**
* 修复设备指令
*/
static class RepairDeviceInstruction extends AbstractInstruction {
@Override
public String getName() {
return "REPAIR_DEVICE";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("修复设备: sn={}", context.getSn());
}
}
/**
* 开始任务指令
*/
static class StartMissionInstruction extends AbstractInstruction {
@Override
public String getName() {
return "START_MISSION";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("开始任务: sn={}", context.getSn());
}
}
/**
* 重试起飞指令
*/
static class RetryTakeOffInstruction extends AbstractInstruction {
@Override
public String getName() {
return "RETRY_TAKE_OFF";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("重试起飞: sn={}", context.getSn());
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("device/" + context.getSn() + "/state")
.fieldPath("droneState")
.expectedValue("FLYING")
.canShortCircuit(false)
.timeoutMs(60000)
.build();
}
}
/**
* 紧急降落指令
*/
static class EmergencyLandInstruction extends AbstractInstruction {
@Override
public String getName() {
return "EMERGENCY_LAND";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("紧急降落: sn={}", context.getSn());
}
}
/**
* 打开舱门指令
*/
static class OpenCoverInstruction extends AbstractInstruction {
@Override
public String getName() {
return "OPEN_COVER";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("打开舱门: sn={}", context.getSn());
}
}
/**
* 关闭舱门指令
*/
static class CloseCoverInstruction extends AbstractInstruction {
@Override
public String getName() {
return "CLOSE_COVER";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("关闭舱门: sn={}", context.getSn());
}
}
/**
* 检查天气指令
*/
static class CheckWeatherInstruction extends AbstractInstruction {
@Override
public String getName() {
return "CHECK_WEATHER";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("检查天气: sn={}", context.getSn());
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("weather/status")
.fieldPath("condition")
.expectedValue("GOOD")
.canShortCircuit(false)
.timeoutMs(10000)
.build();
}
}
/**
* 执行任务指令
*/
static class ExecuteMissionInstruction extends AbstractInstruction {
@Override
public String getName() {
return "EXECUTE_MISSION";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("执行任务: sn={}", context.getSn());
}
}
/**
* 处理恶劣天气指令
*/
static class HandleBadWeatherInstruction extends AbstractInstruction {
@Override
public String getName() {
return "HANDLE_BAD_WEATHER";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("处理恶劣天气: sn={}", context.getSn());
}
@Override
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
return CallbackConfig.builder()
.topic("weather/status")
.fieldPath("severity")
.customPredicate(severity -> !"EXTREME".equals(severity))
.canShortCircuit(false)
.timeoutMs(5000)
.build();
}
}
/**
* 等待指令
*/
static class WaitInstruction extends AbstractInstruction {
@Override
public String getName() {
return "WAIT";
}
@Override
public void executeRemoteCall(InstructionContext context) throws Exception {
log.info("等待中: sn={}", context.getSn());
Thread.sleep(30000); // 等待30秒
}
}
}

View File

@ -0,0 +1,90 @@
package com.tuoheng.machine.instruction;
import lombok.Data;
/**
* 指令节点支持条件分支
*/
@Data
public class InstructionNode {
/**
* 指令实例
*/
private Instruction instruction;
/**
* 成功后执行的下一个指令节点ID
*/
private String onSuccessNodeId;
/**
* 失败后执行的下一个指令节点ID
*/
private String onFailureNodeId;
/**
* 无论成功失败都执行的下一个指令节点ID优先级最高
*/
private String alwaysNextNodeId;
/**
* 节点ID唯一标识
*/
private String nodeId;
public InstructionNode(String nodeId, Instruction instruction) {
this.nodeId = nodeId;
this.instruction = instruction;
}
/**
* 设置无论成功失败都执行的下一个节点
*/
public InstructionNode always(String nextNodeId) {
this.alwaysNextNodeId = nextNodeId;
return this;
}
/**
* 设置成功后执行的下一个节点
*/
public InstructionNode onSuccess(String nextNodeId) {
this.onSuccessNodeId = nextNodeId;
return this;
}
/**
* 设置失败后执行的下一个节点
*/
public InstructionNode onFailure(String nextNodeId) {
this.onFailureNodeId = nextNodeId;
return this;
}
/**
* 根据执行结果获取下一个节点ID
*
* @param success 是否成功
* @return 下一个节点ID如果没有则返回null
*/
public String getNextNodeId(boolean success) {
// 优先返回 always 配置
if (alwaysNextNodeId != null) {
return alwaysNextNodeId;
}
// 根据成功失败返回对应的节点
if (success) {
return onSuccessNodeId;
} else {
return onFailureNodeId;
}
}
/**
* 是否配置了下一个节点
*/
public boolean hasNextNode(boolean success) {
return getNextNodeId(success) != null;
}
}