Compare commits
No commits in common. "1507e758cdaca9a8ceccae39016d32438e514e24" and "96561c749cb785df3f6bd3e033a73914973c5fcd" have entirely different histories.
1507e758cd
...
96561c749c
|
|
@ -162,37 +162,7 @@ public class TransactionExecutor {
|
||||||
return CompletableFuture.completedFuture(result);
|
return CompletableFuture.completedFuture(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
// b. 预先获取回调配置(在发送命令前)
|
// b. 在线程池中执行远程调用(避免阻塞当前线程)
|
||||||
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
|
|
||||||
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
|
|
||||||
|
|
||||||
// 设置回调类型
|
|
||||||
if (methodCallback != null) {
|
|
||||||
methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD);
|
|
||||||
}
|
|
||||||
if (stateCallback != null) {
|
|
||||||
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE);
|
|
||||||
}
|
|
||||||
|
|
||||||
// c. 预先注册回调(在发送命令前,避免竞态条件)
|
|
||||||
CompletableFuture<InstructionResult> methodFuture = null;
|
|
||||||
CompletableFuture<InstructionResult> stateFuture = null;
|
|
||||||
|
|
||||||
if (methodCallback != null) {
|
|
||||||
log.info("【预注册方法回调】instruction={}, topic={}",
|
|
||||||
instruction.getName(), methodCallback.getTopic());
|
|
||||||
methodFuture = waitForCallbackAsync(methodCallback, context);
|
|
||||||
}
|
|
||||||
if (stateCallback != null) {
|
|
||||||
log.info("【预注册状态回调】instruction={}, topic={}",
|
|
||||||
instruction.getName(), stateCallback.getTopic());
|
|
||||||
stateFuture = waitForCallbackAsync(stateCallback, context);
|
|
||||||
}
|
|
||||||
|
|
||||||
// d. 在线程池中执行远程调用(回调已经注册好了)
|
|
||||||
CompletableFuture<InstructionResult> finalMethodFuture = methodFuture;
|
|
||||||
CompletableFuture<InstructionResult> finalStateFuture = stateFuture;
|
|
||||||
|
|
||||||
return CompletableFuture.supplyAsync(() -> {
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
try {
|
try {
|
||||||
instruction.executeRemoteCall(context);
|
instruction.executeRemoteCall(context);
|
||||||
|
|
@ -204,31 +174,32 @@ public class TransactionExecutor {
|
||||||
}
|
}
|
||||||
}, commandExecutor).thenCompose(remoteCallSuccess -> {
|
}, commandExecutor).thenCompose(remoteCallSuccess -> {
|
||||||
if (!remoteCallSuccess) {
|
if (!remoteCallSuccess) {
|
||||||
// 命令发送失败,取消已注册的回调(避免资源泄漏)
|
|
||||||
if (finalMethodFuture != null) {
|
|
||||||
log.warn("命令发送失败,取消方法回调");
|
|
||||||
finalMethodFuture.cancel(true);
|
|
||||||
}
|
|
||||||
if (finalStateFuture != null) {
|
|
||||||
log.warn("命令发送失败,取消状态回调");
|
|
||||||
finalStateFuture.cancel(true);
|
|
||||||
}
|
|
||||||
InstructionResult result = InstructionResult.failure("远程调用失败");
|
InstructionResult result = InstructionResult.failure("远程调用失败");
|
||||||
instruction.onComplete(context, result);
|
instruction.onComplete(context, result);
|
||||||
return CompletableFuture.completedFuture(result);
|
return CompletableFuture.completedFuture(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
// e. 等待方法回调(已经预注册)
|
// c. 等待方法回调(异步)
|
||||||
if (finalMethodFuture != null) {
|
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
|
||||||
return finalMethodFuture.thenCompose(methodResult -> {
|
if (methodCallback != null) {
|
||||||
|
// 自动设置为方法回调类型
|
||||||
|
methodCallback.setCallbackType(CallbackConfig.CallbackType.METHOD);
|
||||||
|
|
||||||
|
return waitForCallbackAsync(methodCallback, context)
|
||||||
|
.thenCompose(methodResult -> {
|
||||||
if (!methodResult.isSuccess()) {
|
if (!methodResult.isSuccess()) {
|
||||||
instruction.onComplete(context, methodResult);
|
instruction.onComplete(context, methodResult);
|
||||||
return CompletableFuture.completedFuture(methodResult);
|
return CompletableFuture.completedFuture(methodResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
// f. 等待状态回调(已经预注册)
|
// d. 等待状态回调(异步)
|
||||||
if (finalStateFuture != null) {
|
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
|
||||||
return finalStateFuture.thenApply(stateResult -> {
|
if (stateCallback != null) {
|
||||||
|
// 自动设置为状态回调类型
|
||||||
|
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE);
|
||||||
|
|
||||||
|
return waitForCallbackAsync(stateCallback, context)
|
||||||
|
.thenApply(stateResult -> {
|
||||||
instruction.onComplete(context, stateResult);
|
instruction.onComplete(context, stateResult);
|
||||||
return stateResult;
|
return stateResult;
|
||||||
});
|
});
|
||||||
|
|
@ -242,8 +213,13 @@ public class TransactionExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 没有方法回调,检查是否有状态回调
|
// 没有方法回调,检查是否有状态回调
|
||||||
if (finalStateFuture != null) {
|
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
|
||||||
return finalStateFuture.thenApply(stateResult -> {
|
if (stateCallback != null) {
|
||||||
|
// 自动设置为状态回调类型
|
||||||
|
stateCallback.setCallbackType(CallbackConfig.CallbackType.STATE);
|
||||||
|
|
||||||
|
return waitForCallbackAsync(stateCallback, context)
|
||||||
|
.thenApply(stateResult -> {
|
||||||
instruction.onComplete(context, stateResult);
|
instruction.onComplete(context, stateResult);
|
||||||
return stateResult;
|
return stateResult;
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue