Compare commits

...

2 Commits

Author SHA1 Message Date
孙小云 1507e758cd 修改回调机制 2026-02-11 10:24:15 +08:00
孙小云 8f85fc7a1e 先注册再发送命令 2026-02-11 10:12:14 +08:00
1 changed files with 64 additions and 40 deletions

View File

@ -162,7 +162,37 @@ public class TransactionExecutor {
return CompletableFuture.completedFuture(result); return CompletableFuture.completedFuture(result);
} }
// b. 在线程池中执行远程调用避免阻塞当前线程 // b. 预先获取回调配置在发送命令前
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
// 设置回调类型
if (methodCallback != null) {
methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD);
}
if (stateCallback != null) {
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE);
}
// c. 预先注册回调在发送命令前避免竞态条件
CompletableFuture<InstructionResult> methodFuture = null;
CompletableFuture<InstructionResult> stateFuture = null;
if (methodCallback != null) {
log.info("【预注册方法回调】instruction={}, topic={}",
instruction.getName(), methodCallback.getTopic());
methodFuture = waitForCallbackAsync(methodCallback, context);
}
if (stateCallback != null) {
log.info("【预注册状态回调】instruction={}, topic={}",
instruction.getName(), stateCallback.getTopic());
stateFuture = waitForCallbackAsync(stateCallback, context);
}
// d. 在线程池中执行远程调用回调已经注册好了
CompletableFuture<InstructionResult> finalMethodFuture = methodFuture;
CompletableFuture<InstructionResult> finalStateFuture = stateFuture;
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
try { try {
instruction.executeRemoteCall(context); instruction.executeRemoteCall(context);
@ -174,55 +204,49 @@ public class TransactionExecutor {
} }
}, commandExecutor).thenCompose(remoteCallSuccess -> { }, commandExecutor).thenCompose(remoteCallSuccess -> {
if (!remoteCallSuccess) { if (!remoteCallSuccess) {
// 命令发送失败取消已注册的回调避免资源泄漏
if (finalMethodFuture != null) {
log.warn("命令发送失败,取消方法回调");
finalMethodFuture.cancel(true);
}
if (finalStateFuture != null) {
log.warn("命令发送失败,取消状态回调");
finalStateFuture.cancel(true);
}
InstructionResult result = InstructionResult.failure("远程调用失败"); InstructionResult result = InstructionResult.failure("远程调用失败");
instruction.onComplete(context, result); instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result); return CompletableFuture.completedFuture(result);
} }
// c. 等待方法回调异步 // e. 等待方法回调已经预注册
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); if (finalMethodFuture != null) {
if (methodCallback != null) { return finalMethodFuture.thenCompose(methodResult -> {
// 自动设置为方法回调类型 if (!methodResult.isSuccess()) {
methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD); instruction.onComplete(context, methodResult);
return CompletableFuture.completedFuture(methodResult);
}
return waitForCallbackAsync(methodCallback, context) // f. 等待状态回调已经预注册
.thenCompose(methodResult -> { if (finalStateFuture != null) {
if (!methodResult.isSuccess()) { return finalStateFuture.thenApply(stateResult -> {
instruction.onComplete(context, methodResult); instruction.onComplete(context, stateResult);
return CompletableFuture.completedFuture(methodResult); return stateResult;
} });
}
// d. 等待状态回调异步 // 没有状态回调直接成功
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); InstructionResult result = InstructionResult.success();
if (stateCallback != null) { instruction.onComplete(context, result);
// 自动设置为状态回调类型 return CompletableFuture.completedFuture(result);
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE); });
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
}
// 没有状态回调直接成功
InstructionResult result = InstructionResult.success();
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
});
} }
// 没有方法回调检查是否有状态回调 // 没有方法回调检查是否有状态回调
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); if (finalStateFuture != null) {
if (stateCallback != null) { return finalStateFuture.thenApply(stateResult -> {
// 自动设置为状态回调类型 instruction.onComplete(context, stateResult);
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE); return stateResult;
});
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
} }
// 没有任何回调直接成功 // 没有任何回调直接成功