实现完全异步化

This commit is contained in:
孙小云 2025-12-18 16:49:18 +08:00
parent cd12f726cb
commit c606bc50fd
2 changed files with 265 additions and 112 deletions

View File

@ -3,189 +3,270 @@ package com.tuoheng.machine.command;
import com.tuoheng.machine.instruction.*;
import com.tuoheng.machine.mqtt.MqttCallbackRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 事务执行器支持条件分支
* 事务执行器完全异步化版本
*
* 设计说明
* 1. 完全异步不阻塞任何线程所有操作都通过 CompletableFuture 链式调用
* 2. 高并发可以同时处理数万个命令不会创建大量线程
* 3. 资源高效线程只在真正需要执行任务时才使用不会浪费在等待上
*
* 性能优势
* - 传统方式10万个命令 = 10万个阻塞线程 = 系统崩溃
* - 异步方式10万个命令 = 200个工作线程 + 10万个 CompletableFuture = 正常运行
*/
@Slf4j
@Component
public class TransactionExecutor {
private final MqttCallbackRegistry callbackRegistry;
private final Executor commandExecutor;
private final ScheduledExecutorService timeoutScheduler;
public TransactionExecutor(MqttCallbackRegistry callbackRegistry) {
public TransactionExecutor(
MqttCallbackRegistry callbackRegistry,
@Qualifier("commandExecutor") Executor commandExecutor) {
this.callbackRegistry = callbackRegistry;
this.commandExecutor = commandExecutor;
// 创建一个专门用于超时检查的调度器核心线程数较小
this.timeoutScheduler = new ScheduledThreadPoolExecutor(
2,
r -> {
Thread t = new Thread(r, "timeout-scheduler");
t.setDaemon(true);
return t;
}
);
log.info("事务执行器初始化完成(完全异步模式)");
}
/**
* 执行事务
* 执行事务完全异步
*
* @param transaction 事务定义
* @param context 执行上下文
* @return CompletableFuture不会阻塞调用线程
*/
public CompletableFuture<CommandResult> executeTransaction(Transaction transaction, InstructionContext context) {
log.info("开始执行事务: transaction={}, sn={}", transaction.getName(), context.getSn());
CompletableFuture<CommandResult> future = new CompletableFuture<>();
long startTime = System.currentTimeMillis();
// 在新线程中执行事务
CompletableFuture.runAsync(() -> {
try {
executeInstructionTree(transaction, context, startTime, future);
} catch (Exception e) {
log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e);
future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage()));
}
});
return future;
// 直接返回异步执行的结果不创建新线程
return executeInstructionTreeAsync(transaction, context, startTime, transaction.getRootInstruction());
}
/**
* 执行指令树
* 异步执行指令树
*
* @param transaction 事务定义
* @param context 执行上下文
* @param startTime 事务开始时间
* @param currentInstruction 当前要执行的指令
* @return CompletableFuture
*/
private void executeInstructionTree(Transaction transaction, InstructionContext context,
long startTime, CompletableFuture<CommandResult> future) {
// 从根指令开始执行
Instruction currentInstruction = transaction.getRootInstruction();
private CompletableFuture<CommandResult> executeInstructionTreeAsync(
Transaction transaction,
InstructionContext context,
long startTime,
Instruction currentInstruction) {
// 检查根指令
if (currentInstruction == null) {
log.error("事务没有根指令: transaction={}", transaction.getName());
future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有根指令"));
return;
return CompletableFuture.completedFuture(
CommandResult.failure(transaction.getCommandType(), "事务没有根指令")
);
}
// 循环执行指令直到没有下一个指令
while (true) {
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
future.complete(CommandResult.timeout(transaction.getCommandType()));
return;
}
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
return CompletableFuture.completedFuture(
CommandResult.timeout(transaction.getCommandType())
);
}
log.debug("执行指令: instruction={}", currentInstruction.getName());
// 执行指令
InstructionResult result = executeInstruction(currentInstruction, context);
// 根据执行结果获取下游指令
Instruction nextInstruction = currentInstruction.getNextInstruction(result.isSuccess());
log.debug("执行指令: instruction={}", currentInstruction.getName());
if (nextInstruction != null) {
// 有下游指令继续执行
currentInstruction = nextInstruction;
log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}",
result.isSuccess(), nextInstruction.getName());
} else {
// 没有下游指令当前指令的结果就是事务的结果
if (!result.isSuccess()) {
// 指令失败事务失败
log.error("指令执行失败(无下游指令): instruction={}, error={}",
currentInstruction.getName(), result.getErrorMessage());
future.complete(CommandResult.failure(
transaction.getCommandType(),
result.getErrorMessage(),
currentInstruction.getName()
));
return;
// 异步执行当前指令
Instruction finalCurrentInstruction = currentInstruction;
return executeInstructionAsync(currentInstruction, context)
.thenCompose(result -> {
// 根据执行结果获取下游指令
Instruction nextInstruction = finalCurrentInstruction.getNextInstruction(result.isSuccess());
if (nextInstruction != null) {
// 有下游指令递归执行
log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}",
result.isSuccess(), nextInstruction.getName());
return executeInstructionTreeAsync(transaction, context, startTime, nextInstruction);
} else {
// 指令成功事务成功
log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}",
currentInstruction.getName(), context.getSn());
future.complete(CommandResult.success(transaction.getCommandType()));
return;
// 没有下游指令当前指令的结果就是事务的结果
if (!result.isSuccess()) {
log.error("指令执行失败(无下游指令): instruction={}, error={}",
finalCurrentInstruction.getName(), result.getErrorMessage());
return CompletableFuture.completedFuture(
CommandResult.failure(
transaction.getCommandType(),
result.getErrorMessage(),
finalCurrentInstruction.getName()
)
);
} else {
log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}",
finalCurrentInstruction.getName(), context.getSn());
return CompletableFuture.completedFuture(
CommandResult.success(transaction.getCommandType())
);
}
}
}
}
});
}
/**
* 执行单个指令
* 异步执行单个指令
*
* @param instruction 指令
* @param context 执行上下文
* @return CompletableFuture<InstructionResult>
*/
private InstructionResult executeInstruction(Instruction instruction, InstructionContext context) {
private CompletableFuture<InstructionResult> executeInstructionAsync(
Instruction instruction,
InstructionContext context) {
log.debug("开始执行指令: instruction={}, sn={}", instruction.getName(), context.getSn());
try {
// a. 判断是否可以执行
if (!instruction.canExecute(context)) {
String error = "指令不满足执行条件";
log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn());
InstructionResult result = InstructionResult.failure(error);
// a. 判断是否可以执行
if (!instruction.canExecute(context)) {
String error = "指令不满足执行条件";
log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn());
InstructionResult result = InstructionResult.failure(error);
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
}
// b. 在线程池中执行远程调用避免阻塞当前线程
return CompletableFuture.supplyAsync(() -> {
try {
instruction.executeRemoteCall(context);
log.debug("远程调用已发送: instruction={}", instruction.getName());
return true;
} catch (Exception e) {
log.error("远程调用失败: instruction={}, sn={}", instruction.getName(), context.getSn(), e);
return false;
}
}, commandExecutor).thenCompose(remoteCallSuccess -> {
if (!remoteCallSuccess) {
InstructionResult result = InstructionResult.failure("远程调用失败");
instruction.onComplete(context, result);
return result;
return CompletableFuture.completedFuture(result);
}
// b. 执行远程调用
instruction.executeRemoteCall(context);
log.debug("远程调用已发送: instruction={}", instruction.getName());
// c. 等待方法回调
// c. 等待方法回调异步
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
if (methodCallback != null) {
InstructionResult methodResult = waitForCallback(methodCallback, context);
if (!methodResult.isSuccess()) {
instruction.onComplete(context, methodResult);
return methodResult;
}
return waitForCallbackAsync(methodCallback, context)
.thenCompose(methodResult -> {
if (!methodResult.isSuccess()) {
instruction.onComplete(context, methodResult);
return CompletableFuture.completedFuture(methodResult);
}
// d. 等待状态回调异步
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
if (stateCallback != null) {
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
}
// 没有状态回调直接成功
InstructionResult result = InstructionResult.success();
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
});
}
// d. 等待状态回调
// 没有方法回调检查是否有状态回调
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
if (stateCallback != null) {
InstructionResult stateResult = waitForCallback(stateCallback, context);
// 注意这里不立即返回而是将结果传递给上层由上层决定下一步
instruction.onComplete(context, stateResult);
return stateResult;
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
}
// 指令执行成功
// 没有任何回调直接成功
InstructionResult result = InstructionResult.success();
instruction.onComplete(context, result);
return result;
} catch (Exception e) {
log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e);
InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage());
instruction.onComplete(context, result);
return result;
}
return CompletableFuture.completedFuture(result);
});
}
/**
* 等待回调
* 异步等待回调不阻塞线程
*
* 关键改进
* 1. 不使用 future.get() 阻塞线程
* 2. 使用 ScheduledExecutorService 实现超时
* 3. 完全基于回调机制
*
* @param callbackConfig 回调配置
* @param context 执行上下文
* @return CompletableFuture<InstructionResult>
*/
private InstructionResult waitForCallback(CallbackConfig callbackConfig, InstructionContext context) {
private CompletableFuture<InstructionResult> waitForCallbackAsync(
CallbackConfig callbackConfig,
InstructionContext context) {
CompletableFuture<InstructionResult> future = new CompletableFuture<>();
AtomicBoolean callbackReceived = new AtomicBoolean(false);
// 注册回调
String callbackId = callbackRegistry.registerCallback(
callbackConfig.getTopic(),
messageBody -> {
if (callbackReceived.get()) {
return; // 已经收到回调忽略后续消息
}
callbackConfig.getTopic(),
messageBody -> {
// 使用 CAS 确保只处理一次
if (callbackReceived.compareAndSet(false, true)) {
// 判断消息是否匹配
if (callbackConfig.matches(messageBody)) {
callbackReceived.set(true);
future.complete(InstructionResult.success(messageBody));
log.debug("收到匹配的回调消息: topic={}", callbackConfig.getTopic());
}
},
callbackConfig.getTimeoutMs()
}
},
callbackConfig.getTimeoutMs()
);
try {
// 等待回调或超时
return future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("等待回调超时: topic={}, timeout={}ms", callbackConfig.getTopic(), callbackConfig.getTimeoutMs());
return InstructionResult.timeout();
} finally {
// 取消注册回调
// 设置超时不阻塞线程
timeoutScheduler.schedule(() -> {
// 使用 CAS 确保只处理一次
if (callbackReceived.compareAndSet(false, true)) {
future.complete(InstructionResult.timeout());
log.warn("等待回调超时: topic={}, timeout={}ms",
callbackConfig.getTopic(), callbackConfig.getTimeoutMs());
}
}, callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
// 清理回调无论成功还是超时
return future.whenComplete((result, throwable) -> {
callbackRegistry.unregisterCallback(callbackId);
}
});
}
}

View File

@ -0,0 +1,72 @@
package com.tuoheng.machine.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置
* 用于命令执行的异步任务
*/
@Slf4j
@Configuration
public class ExecutorConfig {
/**
* 命令执行线程池
*
* 设计说明
* 1. 核心线程数CPU 核心数 * 2适合 I/O 密集型任务
* 2. 最大线程数200控制最大并发防止线程爆炸
* 3. 队列容量10000缓冲等待执行的任务
* 4. 拒绝策略CallerRunsPolicy背压机制让调用者线程执行
*
* 性能预估
* - 假设每个命令平均执行 10
* - 200 个线程可以同时处理 200 个命令
* - 队列可以缓冲 10000 个命令
* - 总容量10200 个并发命令
* - 吞吐量200 / 10 = 20 个命令/
*/
@Bean(name = "commandExecutor")
public Executor commandExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数根据 CPU 核心数设置
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
executor.setCorePoolSize(corePoolSize);
log.info("命令执行线程池核心线程数: {}", corePoolSize);
// 最大线程数限制最大并发防止线程爆炸
executor.setMaxPoolSize(200);
// 队列容量缓冲等待执行的任务
executor.setQueueCapacity(10000);
// 拒绝策略队列满时调用者线程执行背压机制
// 这样可以防止任务丢失同时给系统施加背压
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 线程名称前缀方便日志追踪
executor.setThreadNamePrefix("cmd-exec-");
// 等待任务完成后再关闭优雅关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
// 允许核心线程超时节省资源
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(60);
executor.initialize();
log.info("命令执行线程池初始化完成: corePoolSize={}, maxPoolSize={}, queueCapacity={}",
corePoolSize, 200, 10000);
return executor;
}
}