修改调用链
This commit is contained in:
parent
419230fa7e
commit
9903a75ae1
|
|
@ -2,29 +2,36 @@
|
|||
|
||||
## 概述
|
||||
|
||||
框架现在支持基于状态回调结果的条件分支执行。这意味着你可以根据指令的执行结果(成功/失败)动态决定下一步执行哪个指令。
|
||||
框架现在支持基于状态回调结果的条件分支执行。指令自己维护下游节点关系,形成树状结构,可以根据执行结果(成功/失败)动态决定下一步执行哪个指令。
|
||||
|
||||
## 核心概念
|
||||
|
||||
### 1. InstructionNode(指令节点)
|
||||
### 1. Instruction 自维护下游节点
|
||||
|
||||
指令节点是事务执行的基本单元,支持配置三种类型的后续节点:
|
||||
每个指令可以配置三种类型的下游指令:
|
||||
|
||||
```java
|
||||
InstructionNode node = new InstructionNode("nodeId", instruction)
|
||||
.always("nextNodeId") // 无论成功失败都执行
|
||||
.onSuccess("successNodeId") // 成功时执行
|
||||
.onFailure("failureNodeId"); // 失败时执行
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction()
|
||||
.onSuccess(new TakeOffInstruction()) // 成功时执行
|
||||
.onFailure(new RepairDeviceInstruction()) // 失败时执行
|
||||
.then(new LogInstruction()); // 无论成功失败都执行
|
||||
```
|
||||
|
||||
**优先级**:`always` > `onSuccess/onFailure`
|
||||
**优先级**:`then` (always) > `onSuccess/onFailure`
|
||||
|
||||
### 2. 条件分支触发条件
|
||||
### 2. Transaction 持有根指令
|
||||
|
||||
只有当指令满足以下条件时,才会根据状态回调结果进行分支:
|
||||
Transaction 只需要持有根指令,指令树由指令自己维护:
|
||||
|
||||
1. **配置了 `stateCallback`**(状态回调配置)
|
||||
2. **`canShortCircuit = false`**(不可短路,必须等待状态回调)
|
||||
```java
|
||||
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF)
|
||||
.root(checkInstruction)
|
||||
.setTimeout(120000);
|
||||
```
|
||||
|
||||
### 3. 条件分支触发条件
|
||||
|
||||
只有当指令**配置了 `StateCallbackConfig`** 时,才会根据状态回调结果执行下游节点:
|
||||
|
||||
```java
|
||||
@Override
|
||||
|
|
@ -33,43 +40,41 @@ public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
|||
.topic("device/" + context.getSn() + "/state")
|
||||
.fieldPath("droneState")
|
||||
.expectedValue("FLYING")
|
||||
.canShortCircuit(false) // 关键:必须为 false
|
||||
.timeoutMs(60000)
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
### 3. 分支决策逻辑
|
||||
**核心规则**:
|
||||
- **配置了 StateCallbackConfig** → 必须等待状态回调,下游节点的执行依赖于回调结果
|
||||
- **没有配置 StateCallbackConfig** → 当前指令的结果即为事务结果,不会执行下游节点
|
||||
|
||||
当指令配置了状态回调且 `canShortCircuit=false` 时:
|
||||
### 4. 分支决策逻辑
|
||||
|
||||
1. **状态回调成功** → 执行 `onSuccess` 配置的节点
|
||||
2. **状态回调失败/超时** → 执行 `onFailure` 配置的节点
|
||||
3. **如果找不到对应的下一步节点** → 将状态回调结果作为事务结果返回
|
||||
当指令配置了状态回调时:
|
||||
|
||||
1. **状态回调成功** → 执行 `then` 或 `onSuccess` 配置的指令
|
||||
2. **状态回调失败/超时** → 执行 `then` 或 `onFailure` 配置的指令
|
||||
3. **如果找不到对应的下游指令** → 将状态回调结果作为事务结果返回
|
||||
|
||||
## 使用场景
|
||||
|
||||
### 场景1:简单的成功/失败分支
|
||||
|
||||
```java
|
||||
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF);
|
||||
// 创建指令
|
||||
TakeOffInstruction takeoffInstruction = new TakeOffInstruction();
|
||||
RepairDeviceInstruction repairInstruction = new RepairDeviceInstruction();
|
||||
|
||||
// 检查设备状态
|
||||
InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction())
|
||||
.onSuccess("takeoff") // 状态正常 -> 起飞
|
||||
.onFailure("repair"); // 状态异常 -> 修复
|
||||
// 配置指令树
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction()
|
||||
.onSuccess(takeoffInstruction) // 状态正常 -> 起飞
|
||||
.onFailure(repairInstruction // 状态异常 -> 修复
|
||||
.then(takeoffInstruction)); // 修复后 -> 起飞
|
||||
|
||||
// 起飞
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction());
|
||||
|
||||
// 修复后再起飞
|
||||
InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction())
|
||||
.always("takeoff");
|
||||
|
||||
transaction.addNode(checkNode)
|
||||
.addNode(takeoffNode)
|
||||
.addNode(repairNode)
|
||||
.setStartNode("check");
|
||||
// 创建事务
|
||||
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF)
|
||||
.root(checkInstruction);
|
||||
```
|
||||
|
||||
**执行流程**:
|
||||
|
|
@ -82,29 +87,23 @@ check (检查设备)
|
|||
### 场景2:重试机制
|
||||
|
||||
```java
|
||||
Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF);
|
||||
// 创建指令
|
||||
StartMissionInstruction startMissionInstruction = new StartMissionInstruction();
|
||||
EmergencyLandInstruction emergencyLandInstruction = new EmergencyLandInstruction();
|
||||
|
||||
// 起飞(带状态回调)
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction())
|
||||
.onSuccess("start_mission") // 起飞成功 -> 开始任务
|
||||
.onFailure("retry_takeoff"); // 起飞失败 -> 重试
|
||||
// 重试起飞指令
|
||||
RetryTakeOffInstruction retryInstruction = new RetryTakeOffInstruction()
|
||||
.onSuccess(startMissionInstruction) // 重试成功 -> 开始任务
|
||||
.onFailure(emergencyLandInstruction); // 重试失败 -> 紧急降落
|
||||
|
||||
// 开始任务
|
||||
InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction());
|
||||
// 起飞指令(带状态回调)
|
||||
TakeOffWithStateCallbackInstruction takeoffInstruction = new TakeOffWithStateCallbackInstruction()
|
||||
.onSuccess(startMissionInstruction) // 起飞成功 -> 开始任务
|
||||
.onFailure(retryInstruction); // 起飞失败 -> 重试
|
||||
|
||||
// 重试起飞
|
||||
InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction())
|
||||
.onSuccess("start_mission") // 重试成功 -> 开始任务
|
||||
.onFailure("emergency_land"); // 重试失败 -> 紧急降落
|
||||
|
||||
// 紧急降落
|
||||
InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction());
|
||||
|
||||
transaction.addNode(takeoffNode)
|
||||
.addNode(startMissionNode)
|
||||
.addNode(retryNode)
|
||||
.addNode(emergencyLandNode)
|
||||
.setStartNode("takeoff");
|
||||
// 创建事务
|
||||
Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF)
|
||||
.root(takeoffInstruction);
|
||||
```
|
||||
|
||||
**执行流程**:
|
||||
|
|
@ -119,66 +118,54 @@ takeoff (起飞)
|
|||
### 场景3:状态回调结果直接作为事务结果
|
||||
|
||||
```java
|
||||
Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF);
|
||||
|
||||
// 只有一个节点,没有配置下一步
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction());
|
||||
// 只有一个指令,没有配置下游
|
||||
TakeOffWithStateCallbackInstruction takeoffInstruction = new TakeOffWithStateCallbackInstruction();
|
||||
// 注意:没有配置 onSuccess 或 onFailure
|
||||
|
||||
transaction.addNode(takeoffNode)
|
||||
.setStartNode("takeoff");
|
||||
Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF)
|
||||
.root(takeoffInstruction);
|
||||
```
|
||||
|
||||
**执行流程**:
|
||||
```
|
||||
takeoff (起飞,等待状态回调)
|
||||
├─ 状态回调成功 → 事务成功(没有下一步节点)
|
||||
└─ 状态回调失败 → 事务失败(没有下一步节点)
|
||||
├─ 状态回调成功 → 事务成功(没有下游指令)
|
||||
└─ 状态回调失败 → 事务失败(没有下游指令)
|
||||
```
|
||||
|
||||
### 场景4:复杂的多分支流程
|
||||
|
||||
```java
|
||||
Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION);
|
||||
// 创建所有指令
|
||||
OpenCoverInstruction openCoverInstruction = new OpenCoverInstruction();
|
||||
CheckWeatherInstruction checkWeatherInstruction = new CheckWeatherInstruction();
|
||||
TakeOffInstruction takeoffInstruction = new TakeOffInstruction();
|
||||
ExecuteMissionInstruction executeMissionInstruction = new ExecuteMissionInstruction();
|
||||
HandleBadWeatherInstruction handleBadWeatherInstruction = new HandleBadWeatherInstruction();
|
||||
WaitInstruction waitInstruction = new WaitInstruction();
|
||||
CloseCoverInstruction closeCoverInstruction = new CloseCoverInstruction();
|
||||
|
||||
// 1. 打开舱门
|
||||
InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction())
|
||||
.always("check_weather");
|
||||
// 配置指令树(从叶子节点开始构建)
|
||||
waitInstruction.then(checkWeatherInstruction); // 等待后 -> 重新检查天气
|
||||
|
||||
// 2. 检查天气
|
||||
InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction())
|
||||
.onSuccess("takeoff") // 天气好 -> 起飞
|
||||
.onFailure("handle_bad_weather"); // 天气差 -> 处理
|
||||
handleBadWeatherInstruction
|
||||
.onSuccess(waitInstruction) // 可以等待 -> 等待后重试
|
||||
.onFailure(closeCoverInstruction); // 天气极差 -> 关闭舱门
|
||||
|
||||
// 3. 起飞
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction())
|
||||
.onSuccess("execute_mission") // 起飞成功 -> 执行任务
|
||||
.onFailure("close_cover"); // 起飞失败 -> 关闭舱门
|
||||
takeoffInstruction
|
||||
.onSuccess(executeMissionInstruction) // 起飞成功 -> 执行任务
|
||||
.onFailure(closeCoverInstruction); // 起飞失败 -> 关闭舱门
|
||||
|
||||
// 4. 执行任务
|
||||
InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction());
|
||||
checkWeatherInstruction
|
||||
.onSuccess(takeoffInstruction) // 天气好 -> 起飞
|
||||
.onFailure(handleBadWeatherInstruction); // 天气差 -> 处理
|
||||
|
||||
// 5. 处理恶劣天气
|
||||
InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction())
|
||||
.onSuccess("wait_and_retry") // 可以等待 -> 等待后重试
|
||||
.onFailure("close_cover"); // 天气极差 -> 关闭舱门
|
||||
openCoverInstruction.then(checkWeatherInstruction); // 打开舱门 -> 检查天气
|
||||
|
||||
// 6. 等待并重试
|
||||
InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction())
|
||||
.always("check_weather"); // 等待后 -> 重新检查天气
|
||||
|
||||
// 7. 关闭舱门
|
||||
InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction());
|
||||
|
||||
transaction.addNode(openCoverNode)
|
||||
.addNode(checkWeatherNode)
|
||||
.addNode(takeoffNode)
|
||||
.addNode(executeMissionNode)
|
||||
.addNode(handleBadWeatherNode)
|
||||
.addNode(waitAndRetryNode)
|
||||
.addNode(closeCoverNode)
|
||||
.setStartNode("open_cover")
|
||||
.setTimeout(300000);
|
||||
// 创建事务
|
||||
Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION)
|
||||
.root(openCoverInstruction)
|
||||
.setTimeout(300000);
|
||||
```
|
||||
|
||||
**执行流程**:
|
||||
|
|
@ -195,7 +182,9 @@ open_cover (打开舱门)
|
|||
|
||||
## 指令实现要点
|
||||
|
||||
### 1. 配置状态回调以支持条件分支
|
||||
### 1. 继承 AbstractInstruction
|
||||
|
||||
所有指令都应该继承 `AbstractInstruction`,它提供了下游节点管理功能:
|
||||
|
||||
```java
|
||||
public class MyInstruction extends AbstractInstruction {
|
||||
|
|
@ -219,7 +208,6 @@ public class MyInstruction extends AbstractInstruction {
|
|||
.topic("device/" + context.getSn() + "/response")
|
||||
.fieldPath("cmd")
|
||||
.expectedValue("myCommand")
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(10000)
|
||||
.build();
|
||||
}
|
||||
|
|
@ -227,12 +215,11 @@ public class MyInstruction extends AbstractInstruction {
|
|||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
// 状态回调:等待状态变化
|
||||
// 关键:canShortCircuit=false,表示必须根据状态回调结果决定下一步
|
||||
// 配置了状态回调,则必须等待回调结果,下游节点的执行依赖于此
|
||||
return CallbackConfig.builder()
|
||||
.topic("device/" + context.getSn() + "/state")
|
||||
.fieldPath("status")
|
||||
.expectedValue("SUCCESS")
|
||||
.canShortCircuit(false) // 必须为 false
|
||||
.timeoutMs(60000)
|
||||
.build();
|
||||
}
|
||||
|
|
@ -255,65 +242,79 @@ public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
|||
// 只有状态为SUCCESS且错误码为0才算成功
|
||||
return "SUCCESS".equals(status) && errorCode == 0;
|
||||
})
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(60000)
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
## 与旧版本的兼容性
|
||||
|
||||
框架完全兼容旧版本的线性指令链:
|
||||
### 3. 链式配置下游指令
|
||||
|
||||
```java
|
||||
// 旧版本写法(仍然支持)
|
||||
Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF)
|
||||
.addInstruction(new Instruction1())
|
||||
.addInstruction(new Instruction2())
|
||||
.addInstruction(new Instruction3());
|
||||
// 方式1:直接链式配置
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction()
|
||||
.onSuccess(new TakeOffInstruction())
|
||||
.onFailure(new RepairDeviceInstruction());
|
||||
|
||||
// 新版本写法(支持条件分支)
|
||||
Transaction transaction = new Transaction("起飞", CommandType.TAKE_OFF)
|
||||
.addNode(new InstructionNode("node1", new Instruction1()).always("node2"))
|
||||
.addNode(new InstructionNode("node2", new Instruction2()).always("node3"))
|
||||
.addNode(new InstructionNode("node3", new Instruction3()));
|
||||
// 方式2:先创建再配置(适合复杂场景)
|
||||
TakeOffInstruction takeoffInstruction = new TakeOffInstruction();
|
||||
RepairDeviceInstruction repairInstruction = new RepairDeviceInstruction();
|
||||
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction();
|
||||
checkInstruction.onSuccess(takeoffInstruction);
|
||||
checkInstruction.onFailure(repairInstruction);
|
||||
|
||||
// 方式3:指令复用(同一个指令实例可以被多个上游引用)
|
||||
TakeOffInstruction takeoffInstruction = new TakeOffInstruction();
|
||||
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction()
|
||||
.onSuccess(takeoffInstruction);
|
||||
|
||||
RepairDeviceInstruction repairInstruction = new RepairDeviceInstruction()
|
||||
.then(takeoffInstruction); // 复用同一个 takeoff 实例
|
||||
```
|
||||
|
||||
旧版本的 `addInstruction` 方法会自动转换为节点链,所有现有代码无需修改。
|
||||
|
||||
## 关键规则总结
|
||||
|
||||
1. **条件分支触发条件**:
|
||||
- 必须配置 `stateCallback`
|
||||
- `canShortCircuit` 必须为 `false`
|
||||
1. **下游节点执行的前提**:
|
||||
- **必须配置 `StateCallbackConfig`** - 下游节点的执行强依赖状态回调
|
||||
- **没有配置 `StateCallbackConfig`** - 当前指令的结果即为事务结果,不会执行下游节点
|
||||
|
||||
2. **分支优先级**:
|
||||
- `always` > `onSuccess/onFailure`
|
||||
- 如果配置了 `always`,则无论成功失败都执行该节点
|
||||
- `then` (always) > `onSuccess/onFailure`
|
||||
- 如果配置了 `then`,则无论成功失败都执行该指令
|
||||
|
||||
3. **找不到下一步节点时**:
|
||||
- 如果指令配置了状态回调且 `canShortCircuit=false`
|
||||
- 但找不到对应的下一步节点(`onSuccess` 或 `onFailure`)
|
||||
3. **找不到下游指令时**:
|
||||
- 如果指令配置了状态回调
|
||||
- 但找不到对应的下游指令(`onSuccess` 或 `onFailure`)
|
||||
- 则状态回调的结果直接作为事务结果返回
|
||||
|
||||
4. **超时处理**:
|
||||
- 状态回调超时视为失败
|
||||
- 会执行 `onFailure` 配置的节点
|
||||
- 如果没有配置 `onFailure`,则事务失败
|
||||
4. **指令执行失败**:
|
||||
- `canExecute()` 返回 false → 事务失败
|
||||
- `executeRemoteCall()` 抛异常 → 事务失败
|
||||
- `MethodCallbackConfig` 失败 → 事务失败
|
||||
|
||||
5. **循环检测**:
|
||||
5. **超时处理**:
|
||||
- 方法回调超时 → 事务失败
|
||||
- 状态回调超时 → 视为失败,执行 `onFailure` 或 `then` 配置的指令
|
||||
- 如果没有配置失败分支,则事务失败
|
||||
|
||||
6. **循环检测**:
|
||||
- 框架不会自动检测循环
|
||||
- 请确保事务超时时间足够长
|
||||
- 避免无限循环(如 A → B → A)
|
||||
|
||||
7. **指令复用**:
|
||||
- 同一个指令实例可以被多个上游指令引用
|
||||
- 适用于多个分支最终汇聚到同一个指令的场景
|
||||
|
||||
## 调试技巧
|
||||
|
||||
1. **查看执行日志**:
|
||||
```
|
||||
执行指令节点: nodeId=check, instruction=CHECK_DEVICE
|
||||
根据状态回调结果选择下一个节点: success=true, nextNodeId=takeoff
|
||||
执行指令节点: nodeId=takeoff, instruction=TAKE_OFF
|
||||
状态回调完成,无下一步指令,事务结束: success=true, nodeId=takeoff
|
||||
执行指令: instruction=CHECK_DEVICE
|
||||
根据状态回调结果选择下一个指令: success=true, nextInstruction=TAKE_OFF
|
||||
执行指令: instruction=TAKE_OFF
|
||||
状态回调完成,无下一步指令,事务结束: success=true, instruction=TAKE_OFF
|
||||
```
|
||||
|
||||
2. **使用 `onComplete` 回调记录指令执行结果**:
|
||||
|
|
@ -347,18 +348,53 @@ commandManager.registerCommandListener("debug-listener", (sn, result) -> {
|
|||
- 关键指令应该配置 `onFailure` 分支
|
||||
- 避免因为一个指令失败导致整个事务失败
|
||||
|
||||
4. **使用有意义的节点ID**:
|
||||
- 使用描述性的节点ID,便于调试
|
||||
- 例如:`check_weather`、`takeoff`、`retry_takeoff`
|
||||
4. **合理复用指令实例**:
|
||||
- 多个分支汇聚到同一个指令时,复用同一个实例
|
||||
- 减少对象创建,提高性能
|
||||
|
||||
5. **记录状态变化**:
|
||||
- 在指令的 `onComplete` 中记录执行结果
|
||||
- 便于排查问题和优化流程
|
||||
|
||||
## 完整示例
|
||||
6. **从叶子节点开始构建**:
|
||||
- 复杂的指令树建议从叶子节点开始构建
|
||||
- 先创建所有指令实例,再配置它们之间的关系
|
||||
- 这样更容易理解和维护
|
||||
|
||||
参考 `ConditionalBranchExample.java` 文件,包含4个完整的示例:
|
||||
- 示例1:简单的成功/失败分支
|
||||
- 示例2:基于状态回调结果的分支
|
||||
- 示例3:复杂的多分支流程
|
||||
- 示例4:状态回调结果直接作为事务结果
|
||||
## 架构优势
|
||||
|
||||
1. **职责清晰**:指令自己管理下游关系,符合单一职责原则
|
||||
2. **结构简单**:Transaction 只是一个容器,不需要维护复杂的节点映射
|
||||
3. **易于理解**:指令树的关系直接体现在指令对象上
|
||||
4. **灵活复用**:同一个指令实例可以被多个上游指令引用
|
||||
5. **面向对象**:指令成为了真正的"自包含"实体
|
||||
|
||||
## 完整示例代码
|
||||
|
||||
```java
|
||||
// 示例:带重试的智能起飞
|
||||
public Transaction createSmartTakeoffTransaction() {
|
||||
// 1. 创建所有指令实例
|
||||
TakeOffInstruction takeoffInstruction = new TakeOffInstruction();
|
||||
StartMissionInstruction startMissionInstruction = new StartMissionInstruction();
|
||||
EmergencyLandInstruction emergencyLandInstruction = new EmergencyLandInstruction();
|
||||
|
||||
// 2. 配置重试逻辑
|
||||
RetryTakeOffInstruction retryInstruction = new RetryTakeOffInstruction()
|
||||
.onSuccess(startMissionInstruction)
|
||||
.onFailure(emergencyLandInstruction);
|
||||
|
||||
// 3. 配置主起飞逻辑
|
||||
CheckDeviceInstruction checkInstruction = new CheckDeviceInstruction()
|
||||
.onSuccess(takeoffInstruction
|
||||
.onSuccess(startMissionInstruction)
|
||||
.onFailure(retryInstruction))
|
||||
.onFailure(new RepairDeviceInstruction()
|
||||
.then(takeoffInstruction));
|
||||
|
||||
// 4. 创建事务
|
||||
return new Transaction("智能起飞", CommandType.TAKE_OFF)
|
||||
.root(checkInstruction)
|
||||
.setTimeout(180000);
|
||||
}
|
||||
```
|
||||
|
|
|
|||
|
|
@ -1,16 +1,10 @@
|
|||
package com.tuoheng.machine.command;
|
||||
|
||||
import com.tuoheng.machine.instruction.Instruction;
|
||||
import com.tuoheng.machine.instruction.InstructionNode;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 事务(由多个指令节点组成,支持条件分支)
|
||||
* 事务(由多个指令组成的树状结构,支持条件分支)
|
||||
*/
|
||||
@Data
|
||||
public class Transaction {
|
||||
|
|
@ -25,99 +19,36 @@ public class Transaction {
|
|||
private CommandType commandType;
|
||||
|
||||
/**
|
||||
* 指令节点映射(nodeId -> InstructionNode)
|
||||
* 根指令(事务的起始指令)
|
||||
*/
|
||||
private Map<String, InstructionNode> nodeMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 起始节点ID
|
||||
*/
|
||||
private String startNodeId;
|
||||
private Instruction rootInstruction;
|
||||
|
||||
/**
|
||||
* 事务超时时间(毫秒)
|
||||
*/
|
||||
private long timeoutMs = 120000; // 默认2分钟
|
||||
|
||||
/**
|
||||
* 指令列表(按顺序执行)- 兼容旧版本
|
||||
* @deprecated 使用 nodeMap 和条件分支代替
|
||||
*/
|
||||
@Deprecated
|
||||
private List<Instruction> instructions = new ArrayList<>();
|
||||
|
||||
public Transaction(String name, CommandType commandType) {
|
||||
this.name = name;
|
||||
this.commandType = commandType;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加指令节点
|
||||
* 设置根指令
|
||||
*
|
||||
* @param instruction 根指令
|
||||
* @return Transaction 支持链式调用
|
||||
*/
|
||||
public Transaction addNode(InstructionNode node) {
|
||||
nodeMap.put(node.getNodeId(), node);
|
||||
// 如果是第一个节点,设置为起始节点
|
||||
if (startNodeId == null) {
|
||||
startNodeId = node.getNodeId();
|
||||
}
|
||||
public Transaction root(Instruction instruction) {
|
||||
this.rootInstruction = instruction;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加指令节点(简化版)
|
||||
* 获取根指令
|
||||
*/
|
||||
public Transaction addNode(String nodeId, Instruction instruction) {
|
||||
return addNode(new InstructionNode(nodeId, instruction));
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置起始节点
|
||||
*/
|
||||
public Transaction setStartNode(String nodeId) {
|
||||
if (!nodeMap.containsKey(nodeId)) {
|
||||
throw new IllegalArgumentException("节点不存在: " + nodeId);
|
||||
}
|
||||
this.startNodeId = nodeId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指令节点
|
||||
*/
|
||||
public InstructionNode getNode(String nodeId) {
|
||||
return nodeMap.get(nodeId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取起始节点
|
||||
*/
|
||||
public InstructionNode getStartNode() {
|
||||
return startNodeId != null ? nodeMap.get(startNodeId) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加指令(兼容旧版本,自动转换为线性节点链)
|
||||
* @deprecated 使用 addNode 和条件分支代替
|
||||
*/
|
||||
@Deprecated
|
||||
public Transaction addInstruction(Instruction instruction) {
|
||||
this.instructions.add(instruction);
|
||||
|
||||
// 自动转换为节点
|
||||
String nodeId = "node_" + instructions.size();
|
||||
InstructionNode node = new InstructionNode(nodeId, instruction);
|
||||
|
||||
// 如果不是第一个节点,将上一个节点的 always 指向当前节点
|
||||
if (instructions.size() > 1) {
|
||||
String prevNodeId = "node_" + (instructions.size() - 1);
|
||||
InstructionNode prevNode = nodeMap.get(prevNodeId);
|
||||
if (prevNode != null) {
|
||||
prevNode.always(nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
addNode(node);
|
||||
return this;
|
||||
public Instruction getRootInstruction() {
|
||||
return rootInstruction;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -127,18 +58,4 @@ public class Transaction {
|
|||
this.timeoutMs = timeoutMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否使用图结构(新版本)
|
||||
*/
|
||||
public boolean isGraphMode() {
|
||||
return !nodeMap.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否使用线性结构(旧版本)
|
||||
*/
|
||||
public boolean isLinearMode() {
|
||||
return !instructions.isEmpty() && nodeMap.isEmpty();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,14 +34,7 @@ public class TransactionExecutor {
|
|||
// 在新线程中执行事务
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
// 判断使用图模式还是线性模式
|
||||
if (transaction.isGraphMode()) {
|
||||
// 新版本:图结构执行(支持条件分支)
|
||||
executeGraphTransaction(transaction, context, startTime, future);
|
||||
} else {
|
||||
// 旧版本:线性执行(兼容旧代码)
|
||||
executeLinearTransaction(transaction, context, startTime, future);
|
||||
}
|
||||
executeInstructionTree(transaction, context, startTime, future);
|
||||
} catch (Exception e) {
|
||||
log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e);
|
||||
future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage()));
|
||||
|
|
@ -52,20 +45,20 @@ public class TransactionExecutor {
|
|||
}
|
||||
|
||||
/**
|
||||
* 执行图结构事务(新版本,支持条件分支)
|
||||
* 执行指令树
|
||||
*/
|
||||
private void executeGraphTransaction(Transaction transaction, InstructionContext context,
|
||||
long startTime, CompletableFuture<CommandResult> future) {
|
||||
// 从起始节点开始执行
|
||||
InstructionNode currentNode = transaction.getStartNode();
|
||||
if (currentNode == null) {
|
||||
log.error("事务没有起始节点: transaction={}", transaction.getName());
|
||||
future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有起始节点"));
|
||||
private void executeInstructionTree(Transaction transaction, InstructionContext context,
|
||||
long startTime, CompletableFuture<CommandResult> future) {
|
||||
// 从根指令开始执行
|
||||
Instruction currentInstruction = transaction.getRootInstruction();
|
||||
if (currentInstruction == null) {
|
||||
log.error("事务没有根指令: transaction={}", transaction.getName());
|
||||
future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有根指令"));
|
||||
return;
|
||||
}
|
||||
|
||||
// 循环执行节点,直到没有下一个节点
|
||||
while (currentNode != null) {
|
||||
// 循环执行指令,直到没有下一个指令
|
||||
while (true) {
|
||||
// 检查事务是否超时
|
||||
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
|
||||
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
|
||||
|
|
@ -73,123 +66,38 @@ public class TransactionExecutor {
|
|||
return;
|
||||
}
|
||||
|
||||
Instruction instruction = currentNode.getInstruction();
|
||||
log.debug("执行指令节点: nodeId={}, instruction={}", currentNode.getNodeId(), instruction.getName());
|
||||
|
||||
log.debug("执行指令: instruction={}", currentInstruction.getName());
|
||||
// 执行指令
|
||||
InstructionResult result = executeInstruction(instruction, context);
|
||||
InstructionResult result = executeInstruction(currentInstruction, context);
|
||||
// 根据执行结果获取下游指令
|
||||
Instruction nextInstruction = currentInstruction.getNextInstruction(result.isSuccess());
|
||||
|
||||
// 判断是否需要根据状态回调结果决定下一步
|
||||
boolean shouldBranch = shouldBranchByStateCallback(instruction, context);
|
||||
|
||||
if (shouldBranch) {
|
||||
// 根据状态回调结果决定下一步
|
||||
String nextNodeId = currentNode.getNextNodeId(result.isSuccess());
|
||||
|
||||
if (nextNodeId == null) {
|
||||
// 找不到下一步指令,将状态回调结果作为事务结果返回
|
||||
log.info("状态回调完成,无下一步指令,事务结束: success={}, nodeId={}",
|
||||
result.isSuccess(), currentNode.getNodeId());
|
||||
|
||||
if (result.isSuccess()) {
|
||||
future.complete(CommandResult.success(transaction.getCommandType(), result.getData()));
|
||||
} else {
|
||||
future.complete(CommandResult.failure(
|
||||
transaction.getCommandType(),
|
||||
result.getErrorMessage(),
|
||||
instruction.getName()
|
||||
));
|
||||
}
|
||||
if (nextInstruction != null) {
|
||||
// 有下游指令,继续执行
|
||||
currentInstruction = nextInstruction;
|
||||
log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}",
|
||||
result.isSuccess(), nextInstruction.getName());
|
||||
} else {
|
||||
// 没有下游指令,当前指令的结果就是事务的结果
|
||||
if (!result.isSuccess()) {
|
||||
// 指令失败,事务失败
|
||||
log.error("指令执行失败(无下游指令): instruction={}, error={}",
|
||||
currentInstruction.getName(), result.getErrorMessage());
|
||||
future.complete(CommandResult.failure(
|
||||
transaction.getCommandType(),
|
||||
result.getErrorMessage(),
|
||||
currentInstruction.getName()
|
||||
));
|
||||
return;
|
||||
} else {
|
||||
// 指令成功,事务成功
|
||||
log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}",
|
||||
currentInstruction.getName(), context.getSn());
|
||||
future.complete(CommandResult.success(transaction.getCommandType()));
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取下一个节点
|
||||
currentNode = transaction.getNode(nextNodeId);
|
||||
log.debug("根据状态回调结果选择下一个节点: success={}, nextNodeId={}",
|
||||
result.isSuccess(), nextNodeId);
|
||||
|
||||
} else {
|
||||
// 不需要分支,按照传统逻辑处理
|
||||
if (!result.isSuccess()) {
|
||||
// 指令失败,检查是否有失败分支
|
||||
String nextNodeId = currentNode.getNextNodeId(false);
|
||||
if (nextNodeId != null) {
|
||||
currentNode = transaction.getNode(nextNodeId);
|
||||
log.debug("指令失败,跳转到失败分支: nextNodeId={}", nextNodeId);
|
||||
} else {
|
||||
// 没有失败分支,终止事务
|
||||
log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage());
|
||||
future.complete(CommandResult.failure(
|
||||
transaction.getCommandType(),
|
||||
result.getErrorMessage(),
|
||||
instruction.getName()
|
||||
));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// 指令成功,获取下一个节点
|
||||
String nextNodeId = currentNode.getNextNodeId(true);
|
||||
if (nextNodeId != null) {
|
||||
currentNode = transaction.getNode(nextNodeId);
|
||||
log.debug("指令成功,继续下一个节点: nextNodeId={}", nextNodeId);
|
||||
} else {
|
||||
// 没有下一个节点,事务成功完成
|
||||
log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn());
|
||||
future.complete(CommandResult.success(transaction.getCommandType()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 正常结束(虽然理论上不会走到这里)
|
||||
log.info("事务执行完成: transaction={}, sn={}", transaction.getName(), context.getSn());
|
||||
future.complete(CommandResult.success(transaction.getCommandType()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行线性事务(旧版本,兼容旧代码)
|
||||
*/
|
||||
private void executeLinearTransaction(Transaction transaction, InstructionContext context,
|
||||
long startTime, CompletableFuture<CommandResult> future) {
|
||||
// 依次执行每个指令
|
||||
for (Instruction instruction : transaction.getInstructions()) {
|
||||
// 检查事务是否超时
|
||||
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
|
||||
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
|
||||
future.complete(CommandResult.timeout(transaction.getCommandType()));
|
||||
return;
|
||||
}
|
||||
|
||||
// 执行指令
|
||||
InstructionResult result = executeInstruction(instruction, context);
|
||||
|
||||
// 如果指令失败,终止事务
|
||||
if (!result.isSuccess()) {
|
||||
log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage());
|
||||
future.complete(CommandResult.failure(
|
||||
transaction.getCommandType(),
|
||||
result.getErrorMessage(),
|
||||
instruction.getName()
|
||||
));
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("指令执行成功: instruction={}", instruction.getName());
|
||||
}
|
||||
|
||||
// 所有指令执行成功
|
||||
log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn());
|
||||
future.complete(CommandResult.success(transaction.getCommandType()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否需要根据状态回调结果决定下一步
|
||||
* 条件:配置了 stateCallback 且 canShortCircuit 为 false
|
||||
*/
|
||||
private boolean shouldBranchByStateCallback(Instruction instruction, InstructionContext context) {
|
||||
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
|
||||
return stateCallback != null && !stateCallback.isCanShortCircuit();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -214,7 +122,7 @@ public class TransactionExecutor {
|
|||
|
||||
// c. 等待方法回调
|
||||
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
|
||||
if (methodCallback != null && !methodCallback.isCanShortCircuit()) {
|
||||
if (methodCallback != null) {
|
||||
InstructionResult methodResult = waitForCallback(methodCallback, context);
|
||||
if (!methodResult.isSuccess()) {
|
||||
instruction.onComplete(context, methodResult);
|
||||
|
|
@ -224,7 +132,7 @@ public class TransactionExecutor {
|
|||
|
||||
// d. 等待状态回调
|
||||
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
|
||||
if (stateCallback != null && !stateCallback.isCanShortCircuit()) {
|
||||
if (stateCallback != null) {
|
||||
InstructionResult stateResult = waitForCallback(stateCallback, context);
|
||||
// 注意:这里不立即返回,而是将结果传递给上层,由上层决定下一步
|
||||
instruction.onComplete(context, stateResult);
|
||||
|
|
@ -237,7 +145,7 @@ public class TransactionExecutor {
|
|||
return result;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("指令执行异常: instruction={}, sn=", instruction.getName(), context.getSn(), e);
|
||||
log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e);
|
||||
InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage());
|
||||
instruction.onComplete(context, result);
|
||||
return result;
|
||||
|
|
@ -271,8 +179,7 @@ public class TransactionExecutor {
|
|||
|
||||
try {
|
||||
// 等待回调或超时
|
||||
InstructionResult result = future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
|
||||
return result;
|
||||
return future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
log.warn("等待回调超时: topic={}, timeout={}ms", callbackConfig.getTopic(), callbackConfig.getTimeoutMs());
|
||||
return InstructionResult.timeout();
|
||||
|
|
|
|||
|
|
@ -1,433 +0,0 @@
|
|||
package com.tuoheng.machine.example;
|
||||
|
||||
import com.tuoheng.machine.command.CommandType;
|
||||
import com.tuoheng.machine.command.Transaction;
|
||||
import com.tuoheng.machine.instruction.AbstractInstruction;
|
||||
import com.tuoheng.machine.instruction.CallbackConfig;
|
||||
import com.tuoheng.machine.instruction.InstructionContext;
|
||||
import com.tuoheng.machine.instruction.InstructionNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 条件分支执行示例
|
||||
*
|
||||
* 演示如何使用新的图结构和条件分支功能
|
||||
*/
|
||||
@Slf4j
|
||||
public class ConditionalBranchExample {
|
||||
|
||||
/**
|
||||
* 示例1:简单的成功/失败分支
|
||||
*
|
||||
* 流程:
|
||||
* 1. 检查设备状态
|
||||
* 2. 如果状态正常 -> 执行起飞
|
||||
* 3. 如果状态异常 -> 执行修复操作
|
||||
*/
|
||||
public Transaction example1_SimpleConditionalBranch() {
|
||||
Transaction transaction = new Transaction("条件分支起飞", CommandType.TAKE_OFF);
|
||||
|
||||
// 节点1:检查设备状态
|
||||
InstructionNode checkNode = new InstructionNode("check", new CheckDeviceInstruction())
|
||||
.onSuccess("takeoff") // 成功 -> 起飞
|
||||
.onFailure("repair"); // 失败 -> 修复
|
||||
|
||||
// 节点2:起飞指令
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction());
|
||||
// 起飞后没有下一步,事务结束
|
||||
|
||||
// 节点3:修复指令
|
||||
InstructionNode repairNode = new InstructionNode("repair", new RepairDeviceInstruction())
|
||||
.always("takeoff"); // 修复后 -> 起飞
|
||||
|
||||
// 添加所有节点
|
||||
transaction.addNode(checkNode)
|
||||
.addNode(takeoffNode)
|
||||
.addNode(repairNode)
|
||||
.setStartNode("check");
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* 示例2:基于状态回调结果的分支
|
||||
*
|
||||
* 流程:
|
||||
* 1. 发送起飞指令
|
||||
* 2. 等待状态回调(stateCallback,canShortCircuit=false)
|
||||
* 3. 如果起飞成功 -> 开始任务
|
||||
* 4. 如果起飞失败 -> 重试起飞
|
||||
*/
|
||||
public Transaction example2_StateCallbackBranch() {
|
||||
Transaction transaction = new Transaction("智能起飞", CommandType.TAKE_OFF);
|
||||
|
||||
// 节点1:起飞指令(配置了状态回调)
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction())
|
||||
.onSuccess("start_mission") // 状态回调成功 -> 开始任务
|
||||
.onFailure("retry_takeoff"); // 状态回调失败 -> 重试
|
||||
|
||||
// 节点2:开始任务
|
||||
InstructionNode startMissionNode = new InstructionNode("start_mission", new StartMissionInstruction());
|
||||
|
||||
// 节点3:重试起飞
|
||||
InstructionNode retryNode = new InstructionNode("retry_takeoff", new RetryTakeOffInstruction())
|
||||
.onSuccess("start_mission") // 重试成功 -> 开始任务
|
||||
.onFailure("emergency_land"); // 重试失败 -> 紧急降落
|
||||
|
||||
// 节点4:紧急降落
|
||||
InstructionNode emergencyLandNode = new InstructionNode("emergency_land", new EmergencyLandInstruction());
|
||||
|
||||
transaction.addNode(takeoffNode)
|
||||
.addNode(startMissionNode)
|
||||
.addNode(retryNode)
|
||||
.addNode(emergencyLandNode)
|
||||
.setStartNode("takeoff");
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* 示例3:复杂的多分支流程
|
||||
*
|
||||
* 流程:
|
||||
* 1. 打开舱门
|
||||
* 2. 检查天气
|
||||
* 3. 如果天气好 -> 起飞 -> 执行任务
|
||||
* 4. 如果天气差 -> 等待 -> 重新检查天气
|
||||
* 5. 如果天气极差 -> 关闭舱门 -> 取消任务
|
||||
*/
|
||||
public Transaction example3_ComplexBranch() {
|
||||
Transaction transaction = new Transaction("智能任务执行", CommandType.START_MISSION);
|
||||
|
||||
// 节点1:打开舱门
|
||||
InstructionNode openCoverNode = new InstructionNode("open_cover", new OpenCoverInstruction())
|
||||
.always("check_weather");
|
||||
|
||||
// 节点2:检查天气
|
||||
InstructionNode checkWeatherNode = new InstructionNode("check_weather", new CheckWeatherInstruction())
|
||||
.onSuccess("takeoff") // 天气好 -> 起飞
|
||||
.onFailure("handle_bad_weather"); // 天气差 -> 处理
|
||||
|
||||
// 节点3:起飞
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffInstruction())
|
||||
.onSuccess("execute_mission") // 起飞成功 -> 执行任务
|
||||
.onFailure("close_cover"); // 起飞失败 -> 关闭舱门
|
||||
|
||||
// 节点4:执行任务
|
||||
InstructionNode executeMissionNode = new InstructionNode("execute_mission", new ExecuteMissionInstruction());
|
||||
|
||||
// 节点5:处理恶劣天气
|
||||
InstructionNode handleBadWeatherNode = new InstructionNode("handle_bad_weather", new HandleBadWeatherInstruction())
|
||||
.onSuccess("wait_and_retry") // 可以等待 -> 等待后重试
|
||||
.onFailure("close_cover"); // 天气极差 -> 关闭舱门
|
||||
|
||||
// 节点6:等待并重试
|
||||
InstructionNode waitAndRetryNode = new InstructionNode("wait_and_retry", new WaitInstruction())
|
||||
.always("check_weather"); // 等待后 -> 重新检查天气
|
||||
|
||||
// 节点7:关闭舱门
|
||||
InstructionNode closeCoverNode = new InstructionNode("close_cover", new CloseCoverInstruction());
|
||||
|
||||
transaction.addNode(openCoverNode)
|
||||
.addNode(checkWeatherNode)
|
||||
.addNode(takeoffNode)
|
||||
.addNode(executeMissionNode)
|
||||
.addNode(handleBadWeatherNode)
|
||||
.addNode(waitAndRetryNode)
|
||||
.addNode(closeCoverNode)
|
||||
.setStartNode("open_cover")
|
||||
.setTimeout(300000); // 5分钟超时
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* 示例4:状态回调结果直接作为事务结果
|
||||
*
|
||||
* 流程:
|
||||
* 1. 发送起飞指令
|
||||
* 2. 等待状态回调(canShortCircuit=false)
|
||||
* 3. 没有配置下一步节点
|
||||
* 4. 状态回调的结果直接作为事务结果返回
|
||||
*/
|
||||
public Transaction example4_StateCallbackAsResult() {
|
||||
Transaction transaction = new Transaction("简单起飞", CommandType.TAKE_OFF);
|
||||
|
||||
// 只有一个节点,配置了状态回调,但没有配置下一步
|
||||
InstructionNode takeoffNode = new InstructionNode("takeoff", new TakeOffWithStateCallbackInstruction());
|
||||
// 注意:没有配置 onSuccess 或 onFailure
|
||||
// 状态回调的结果将直接作为事务结果返回
|
||||
|
||||
transaction.addNode(takeoffNode)
|
||||
.setStartNode("takeoff");
|
||||
|
||||
return transaction;
|
||||
}
|
||||
|
||||
// ==================== 示例指令实现 ====================
|
||||
|
||||
/**
|
||||
* 检查设备状态指令
|
||||
*/
|
||||
static class CheckDeviceInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "CHECK_DEVICE";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("检查设备状态: sn={}", context.getSn());
|
||||
// 发送MQTT消息检查设备状态
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
return CallbackConfig.builder()
|
||||
.topic("device/" + context.getSn() + "/status")
|
||||
.fieldPath("status")
|
||||
.expectedValue("OK")
|
||||
.canShortCircuit(false) // 必须等待状态回调
|
||||
.timeoutMs(10000)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 起飞指令(带状态回调)
|
||||
*/
|
||||
static class TakeOffWithStateCallbackInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "TAKE_OFF_WITH_STATE";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("发送起飞指令: sn={}", context.getSn());
|
||||
// 发送MQTT起飞指令
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getMethodCallbackConfig(InstructionContext context) {
|
||||
return CallbackConfig.builder()
|
||||
.topic("device/" + context.getSn() + "/response")
|
||||
.fieldPath("cmd")
|
||||
.expectedValue("takeoff")
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(10000)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
// 关键:canShortCircuit=false,表示必须根据状态回调结果决定下一步
|
||||
return CallbackConfig.builder()
|
||||
.topic("device/" + context.getSn() + "/state")
|
||||
.fieldPath("droneState")
|
||||
.expectedValue("FLYING")
|
||||
.canShortCircuit(false) // 必须等待状态回调,并根据结果决定下一步
|
||||
.timeoutMs(60000)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 简单起飞指令
|
||||
*/
|
||||
static class TakeOffInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "TAKE_OFF";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("起飞: sn=", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 修复设备指令
|
||||
*/
|
||||
static class RepairDeviceInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "REPAIR_DEVICE";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("修复设备: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 开始任务指令
|
||||
*/
|
||||
static class StartMissionInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "START_MISSION";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("开始任务: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 重试起飞指令
|
||||
*/
|
||||
static class RetryTakeOffInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "RETRY_TAKE_OFF";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("重试起飞: sn={}", context.getSn());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
return CallbackConfig.builder()
|
||||
.topic("device/" + context.getSn() + "/state")
|
||||
.fieldPath("droneState")
|
||||
.expectedValue("FLYING")
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(60000)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 紧急降落指令
|
||||
*/
|
||||
static class EmergencyLandInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "EMERGENCY_LAND";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("紧急降落: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 打开舱门指令
|
||||
*/
|
||||
static class OpenCoverInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "OPEN_COVER";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("打开舱门: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭舱门指令
|
||||
*/
|
||||
static class CloseCoverInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "CLOSE_COVER";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("关闭舱门: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查天气指令
|
||||
*/
|
||||
static class CheckWeatherInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "CHECK_WEATHER";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("检查天气: sn={}", context.getSn());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
return CallbackConfig.builder()
|
||||
.topic("weather/status")
|
||||
.fieldPath("condition")
|
||||
.expectedValue("GOOD")
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(10000)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行任务指令
|
||||
*/
|
||||
static class ExecuteMissionInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "EXECUTE_MISSION";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("执行任务: sn={}", context.getSn());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理恶劣天气指令
|
||||
*/
|
||||
static class HandleBadWeatherInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "HANDLE_BAD_WEATHER";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("处理恶劣天气: sn={}", context.getSn());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CallbackConfig getStateCallbackConfig(InstructionContext context) {
|
||||
return CallbackConfig.builder()
|
||||
.topic("weather/status")
|
||||
.fieldPath("severity")
|
||||
.customPredicate(severity -> !"EXTREME".equals(severity))
|
||||
.canShortCircuit(false)
|
||||
.timeoutMs(5000)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 等待指令
|
||||
*/
|
||||
static class WaitInstruction extends AbstractInstruction {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "WAIT";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeRemoteCall(InstructionContext context) throws Exception {
|
||||
log.info("等待中: sn={}", context.getSn());
|
||||
Thread.sleep(30000); // 等待30秒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,25 @@
|
|||
package com.tuoheng.machine.instruction;
|
||||
|
||||
/**
|
||||
* 抽象指令基类,提供默认实现
|
||||
* 抽象指令基类,提供默认实现和下游节点管理
|
||||
*/
|
||||
public abstract class AbstractInstruction implements Instruction {
|
||||
|
||||
/**
|
||||
* 成功后执行的下一个指令
|
||||
*/
|
||||
private Instruction onSuccessInstruction;
|
||||
|
||||
/**
|
||||
* 失败后执行的下一个指令
|
||||
*/
|
||||
private Instruction onFailureInstruction;
|
||||
|
||||
/**
|
||||
* 无论成功失败都执行的下一个指令
|
||||
*/
|
||||
private Instruction alwaysNextInstruction;
|
||||
|
||||
@Override
|
||||
public boolean canExecute(InstructionContext context) {
|
||||
// 默认可以执行
|
||||
|
|
@ -32,4 +47,43 @@ public abstract class AbstractInstruction implements Instruction {
|
|||
public void onComplete(InstructionContext context, InstructionResult result) {
|
||||
// 默认空实现
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instruction getOnSuccessInstruction() {
|
||||
return onSuccessInstruction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instruction getOnFailureInstruction() {
|
||||
return onFailureInstruction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instruction getAlwaysNextInstruction() {
|
||||
return alwaysNextInstruction;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置成功后执行的指令(支持链式调用)
|
||||
*/
|
||||
public <T extends AbstractInstruction> T onSuccess(Instruction instruction) {
|
||||
this.onSuccessInstruction = instruction;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置失败后执行的指令(支持链式调用)
|
||||
*/
|
||||
public <T extends AbstractInstruction> T onFailure(Instruction instruction) {
|
||||
this.onFailureInstruction = instruction;
|
||||
return (T) this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置无论成功失败都执行的指令(支持链式调用)
|
||||
*/
|
||||
public <T extends AbstractInstruction> T then(Instruction instruction) {
|
||||
this.alwaysNextInstruction = instruction;
|
||||
return (T) this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,11 +35,6 @@ public class CallbackConfig {
|
|||
*/
|
||||
private Predicate<Object> customPredicate;
|
||||
|
||||
/**
|
||||
* 是否可以被短路(跳过此回调)
|
||||
*/
|
||||
private boolean canShortCircuit;
|
||||
|
||||
/**
|
||||
* 超时时间(毫秒)
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -49,4 +49,46 @@ public interface Instruction {
|
|||
default void onComplete(InstructionContext context, InstructionResult result) {
|
||||
// 默认空实现
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取成功后执行的下一个指令
|
||||
*/
|
||||
default Instruction getOnSuccessInstruction() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取失败后执行的下一个指令
|
||||
*/
|
||||
default Instruction getOnFailureInstruction() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取无论成功失败都执行的下一个指令(优先级最高)
|
||||
*/
|
||||
default Instruction getAlwaysNextInstruction() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据执行结果获取下一个指令
|
||||
*
|
||||
* @param success 是否成功
|
||||
* @return 下一个指令,如果没有则返回null
|
||||
*/
|
||||
default Instruction getNextInstruction(boolean success) {
|
||||
// 优先返回 always 配置
|
||||
Instruction alwaysNext = getAlwaysNextInstruction();
|
||||
if (alwaysNext != null) {
|
||||
return alwaysNext;
|
||||
}
|
||||
|
||||
// 根据成功失败返回对应的指令
|
||||
if (success) {
|
||||
return getOnSuccessInstruction();
|
||||
} else {
|
||||
return getOnFailureInstruction();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,90 +0,0 @@
|
|||
package com.tuoheng.machine.instruction;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 指令节点(支持条件分支)
|
||||
*/
|
||||
@Data
|
||||
public class InstructionNode {
|
||||
/**
|
||||
* 指令实例
|
||||
*/
|
||||
private Instruction instruction;
|
||||
|
||||
/**
|
||||
* 成功后执行的下一个指令节点ID
|
||||
*/
|
||||
private String onSuccessNodeId;
|
||||
|
||||
/**
|
||||
* 失败后执行的下一个指令节点ID
|
||||
*/
|
||||
private String onFailureNodeId;
|
||||
|
||||
/**
|
||||
* 无论成功失败都执行的下一个指令节点ID(优先级最高)
|
||||
*/
|
||||
private String alwaysNextNodeId;
|
||||
|
||||
/**
|
||||
* 节点ID(唯一标识)
|
||||
*/
|
||||
private String nodeId;
|
||||
|
||||
public InstructionNode(String nodeId, Instruction instruction) {
|
||||
this.nodeId = nodeId;
|
||||
this.instruction = instruction;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置无论成功失败都执行的下一个节点
|
||||
*/
|
||||
public InstructionNode always(String nextNodeId) {
|
||||
this.alwaysNextNodeId = nextNodeId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置成功后执行的下一个节点
|
||||
*/
|
||||
public InstructionNode onSuccess(String nextNodeId) {
|
||||
this.onSuccessNodeId = nextNodeId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置失败后执行的下一个节点
|
||||
*/
|
||||
public InstructionNode onFailure(String nextNodeId) {
|
||||
this.onFailureNodeId = nextNodeId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据执行结果获取下一个节点ID
|
||||
*
|
||||
* @param success 是否成功
|
||||
* @return 下一个节点ID,如果没有则返回null
|
||||
*/
|
||||
public String getNextNodeId(boolean success) {
|
||||
// 优先返回 always 配置
|
||||
if (alwaysNextNodeId != null) {
|
||||
return alwaysNextNodeId;
|
||||
}
|
||||
|
||||
// 根据成功失败返回对应的节点
|
||||
if (success) {
|
||||
return onSuccessNodeId;
|
||||
} else {
|
||||
return onFailureNodeId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否配置了下一个节点
|
||||
*/
|
||||
public boolean hasNextNode(boolean success) {
|
||||
return getNextNodeId(success) != null;
|
||||
}
|
||||
}
|
||||
|
|
@ -127,57 +127,55 @@ public class DjiVendorConfig implements VendorConfig {
|
|||
private void initTransactions() {
|
||||
// 起飞命令
|
||||
Transaction takeOffTransaction = new Transaction("起飞", CommandType.TAKE_OFF)
|
||||
.addInstruction(new DjiTakeOffInstruction())
|
||||
.root(new DjiTakeOffInstruction())
|
||||
.setTimeout(90000);
|
||||
transactionMap.put(CommandType.TAKE_OFF, takeOffTransaction);
|
||||
|
||||
|
||||
|
||||
// 返航命令
|
||||
Transaction returnHomeTransaction = new Transaction("返航", CommandType.RETURN_HOME)
|
||||
.addInstruction(new DjiReturnHomeInstruction())
|
||||
.root(new DjiReturnHomeInstruction())
|
||||
.setTimeout(120000);
|
||||
transactionMap.put(CommandType.RETURN_HOME, returnHomeTransaction);
|
||||
|
||||
// 急停命令
|
||||
Transaction emergencyStopTransaction = new Transaction("急停", CommandType.EMERGENCY_STOP)
|
||||
.addInstruction(new DjiEmergencyStopInstruction())
|
||||
.root(new DjiEmergencyStopInstruction())
|
||||
.setTimeout(30000);
|
||||
transactionMap.put(CommandType.EMERGENCY_STOP, emergencyStopTransaction);
|
||||
|
||||
// 继续飞行命令
|
||||
Transaction resumeFlightTransaction = new Transaction("继续飞行", CommandType.RESUME_FLIGHT)
|
||||
.addInstruction(new DjiResumeFlightInstruction())
|
||||
.root(new DjiResumeFlightInstruction())
|
||||
.setTimeout(60000);
|
||||
transactionMap.put(CommandType.RESUME_FLIGHT, resumeFlightTransaction);
|
||||
|
||||
// 指点飞行命令
|
||||
Transaction pointFlyTransaction = new Transaction("指点飞行", CommandType.POINT_FLY)
|
||||
.addInstruction(new DjiPointFlyInstruction())
|
||||
.root(new DjiPointFlyInstruction())
|
||||
.setTimeout(90000);
|
||||
transactionMap.put(CommandType.POINT_FLY, pointFlyTransaction);
|
||||
|
||||
// 取消指点命令
|
||||
Transaction cancelPointTransaction = new Transaction("取消指点", CommandType.CANCEL_POINT)
|
||||
.addInstruction(new DjiCancelPointInstruction())
|
||||
.root(new DjiCancelPointInstruction())
|
||||
.setTimeout(30000);
|
||||
transactionMap.put(CommandType.CANCEL_POINT, cancelPointTransaction);
|
||||
|
||||
// 开始航线任务命令
|
||||
Transaction startMissionTransaction = new Transaction("开始航线任务", CommandType.START_MISSION)
|
||||
.addInstruction(new DjiStartMissionInstruction())
|
||||
.root(new DjiStartMissionInstruction())
|
||||
.setTimeout(120000);
|
||||
transactionMap.put(CommandType.START_MISSION, startMissionTransaction);
|
||||
|
||||
// 打开舱门命令
|
||||
Transaction openCoverTransaction = new Transaction("打开舱门", CommandType.OPEN_COVER)
|
||||
.addInstruction(new DjiOpenCoverInstruction())
|
||||
.root(new DjiOpenCoverInstruction())
|
||||
.setTimeout(60000);
|
||||
transactionMap.put(CommandType.OPEN_COVER, openCoverTransaction);
|
||||
|
||||
// 关闭舱门命令
|
||||
Transaction closeCoverTransaction = new Transaction("关闭舱门", CommandType.CLOSE_COVER)
|
||||
.addInstruction(new DjiCloseCoverInstruction())
|
||||
.root(new DjiCloseCoverInstruction())
|
||||
.setTimeout(60000);
|
||||
transactionMap.put(CommandType.CLOSE_COVER, closeCoverTransaction);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue