diff --git a/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java b/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java index a92404f..79f5271 100644 --- a/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java +++ b/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java @@ -3,189 +3,270 @@ package com.tuoheng.machine.command; import com.tuoheng.machine.instruction.*; import com.tuoheng.machine.mqtt.MqttCallbackRegistry; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * 事务执行器(支持条件分支) + * 事务执行器(完全异步化版本) + * + * 设计说明: + * 1. 完全异步:不阻塞任何线程,所有操作都通过 CompletableFuture 链式调用 + * 2. 高并发:可以同时处理数万个命令,不会创建大量线程 + * 3. 资源高效:线程只在真正需要执行任务时才使用,不会浪费在等待上 + * + * 性能优势: + * - 传统方式:10万个命令 = 10万个阻塞线程 = 系统崩溃 + * - 异步方式:10万个命令 = 200个工作线程 + 10万个 CompletableFuture = 正常运行 */ @Slf4j @Component public class TransactionExecutor { private final MqttCallbackRegistry callbackRegistry; + private final Executor commandExecutor; + private final ScheduledExecutorService timeoutScheduler; - public TransactionExecutor(MqttCallbackRegistry callbackRegistry) { + public TransactionExecutor( + MqttCallbackRegistry callbackRegistry, + @Qualifier("commandExecutor") Executor commandExecutor) { this.callbackRegistry = callbackRegistry; + this.commandExecutor = commandExecutor; + + // 创建一个专门用于超时检查的调度器(核心线程数较小) + this.timeoutScheduler = new ScheduledThreadPoolExecutor( + 2, + r -> { + Thread t = new Thread(r, "timeout-scheduler"); + t.setDaemon(true); + return t; + } + ); + + log.info("事务执行器初始化完成(完全异步模式)"); } /** - * 执行事务 + * 执行事务(完全异步) + * + * @param transaction 事务定义 + * @param context 执行上下文 + * @return CompletableFuture,不会阻塞调用线程 */ public CompletableFuture executeTransaction(Transaction transaction, InstructionContext context) { log.info("开始执行事务: transaction={}, sn={}", transaction.getName(), context.getSn()); - CompletableFuture future = new CompletableFuture<>(); long startTime = System.currentTimeMillis(); - // 在新线程中执行事务 - CompletableFuture.runAsync(() -> { - try { - executeInstructionTree(transaction, context, startTime, future); - } catch (Exception e) { - log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e); - future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage())); - } - }); - - return future; + // 直接返回异步执行的结果,不创建新线程 + return executeInstructionTreeAsync(transaction, context, startTime, transaction.getRootInstruction()); } /** - * 执行指令树 + * 异步执行指令树 + * + * @param transaction 事务定义 + * @param context 执行上下文 + * @param startTime 事务开始时间 + * @param currentInstruction 当前要执行的指令 + * @return CompletableFuture */ - private void executeInstructionTree(Transaction transaction, InstructionContext context, - long startTime, CompletableFuture future) { - // 从根指令开始执行 - Instruction currentInstruction = transaction.getRootInstruction(); + private CompletableFuture executeInstructionTreeAsync( + Transaction transaction, + InstructionContext context, + long startTime, + Instruction currentInstruction) { + + // 检查根指令 if (currentInstruction == null) { log.error("事务没有根指令: transaction={}", transaction.getName()); - future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有根指令")); - return; + return CompletableFuture.completedFuture( + CommandResult.failure(transaction.getCommandType(), "事务没有根指令") + ); } - // 循环执行指令,直到没有下一个指令 - while (true) { - // 检查事务是否超时 - if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { - log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); - future.complete(CommandResult.timeout(transaction.getCommandType())); - return; - } + // 检查事务是否超时 + if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { + log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); + return CompletableFuture.completedFuture( + CommandResult.timeout(transaction.getCommandType()) + ); + } - log.debug("执行指令: instruction={}", currentInstruction.getName()); - // 执行指令 - InstructionResult result = executeInstruction(currentInstruction, context); - // 根据执行结果获取下游指令 - Instruction nextInstruction = currentInstruction.getNextInstruction(result.isSuccess()); + log.debug("执行指令: instruction={}", currentInstruction.getName()); - if (nextInstruction != null) { - // 有下游指令,继续执行 - currentInstruction = nextInstruction; - log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}", - result.isSuccess(), nextInstruction.getName()); - } else { - // 没有下游指令,当前指令的结果就是事务的结果 - if (!result.isSuccess()) { - // 指令失败,事务失败 - log.error("指令执行失败(无下游指令): instruction={}, error={}", - currentInstruction.getName(), result.getErrorMessage()); - future.complete(CommandResult.failure( - transaction.getCommandType(), - result.getErrorMessage(), - currentInstruction.getName() - )); - return; + // 异步执行当前指令 + Instruction finalCurrentInstruction = currentInstruction; + return executeInstructionAsync(currentInstruction, context) + .thenCompose(result -> { + // 根据执行结果获取下游指令 + Instruction nextInstruction = finalCurrentInstruction.getNextInstruction(result.isSuccess()); + + if (nextInstruction != null) { + // 有下游指令,递归执行 + log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}", + result.isSuccess(), nextInstruction.getName()); + return executeInstructionTreeAsync(transaction, context, startTime, nextInstruction); } else { - // 指令成功,事务成功 - log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}", - currentInstruction.getName(), context.getSn()); - future.complete(CommandResult.success(transaction.getCommandType())); - return; + // 没有下游指令,当前指令的结果就是事务的结果 + if (!result.isSuccess()) { + log.error("指令执行失败(无下游指令): instruction={}, error={}", + finalCurrentInstruction.getName(), result.getErrorMessage()); + return CompletableFuture.completedFuture( + CommandResult.failure( + transaction.getCommandType(), + result.getErrorMessage(), + finalCurrentInstruction.getName() + ) + ); + } else { + log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}", + finalCurrentInstruction.getName(), context.getSn()); + return CompletableFuture.completedFuture( + CommandResult.success(transaction.getCommandType()) + ); + } } - } - } + }); } /** - * 执行单个指令 + * 异步执行单个指令 + * + * @param instruction 指令 + * @param context 执行上下文 + * @return CompletableFuture */ - private InstructionResult executeInstruction(Instruction instruction, InstructionContext context) { + private CompletableFuture executeInstructionAsync( + Instruction instruction, + InstructionContext context) { + log.debug("开始执行指令: instruction={}, sn={}", instruction.getName(), context.getSn()); - try { - // a. 判断是否可以执行 - if (!instruction.canExecute(context)) { - String error = "指令不满足执行条件"; - log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn()); - InstructionResult result = InstructionResult.failure(error); + // a. 判断是否可以执行 + if (!instruction.canExecute(context)) { + String error = "指令不满足执行条件"; + log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn()); + InstructionResult result = InstructionResult.failure(error); + instruction.onComplete(context, result); + return CompletableFuture.completedFuture(result); + } + + // b. 在线程池中执行远程调用(避免阻塞当前线程) + return CompletableFuture.supplyAsync(() -> { + try { + instruction.executeRemoteCall(context); + log.debug("远程调用已发送: instruction={}", instruction.getName()); + return true; + } catch (Exception e) { + log.error("远程调用失败: instruction={}, sn={}", instruction.getName(), context.getSn(), e); + return false; + } + }, commandExecutor).thenCompose(remoteCallSuccess -> { + if (!remoteCallSuccess) { + InstructionResult result = InstructionResult.failure("远程调用失败"); instruction.onComplete(context, result); - return result; + return CompletableFuture.completedFuture(result); } - // b. 执行远程调用 - instruction.executeRemoteCall(context); - log.debug("远程调用已发送: instruction={}", instruction.getName()); - - // c. 等待方法回调 + // c. 等待方法回调(异步) CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); if (methodCallback != null) { - InstructionResult methodResult = waitForCallback(methodCallback, context); - if (!methodResult.isSuccess()) { - instruction.onComplete(context, methodResult); - return methodResult; - } + return waitForCallbackAsync(methodCallback, context) + .thenCompose(methodResult -> { + if (!methodResult.isSuccess()) { + instruction.onComplete(context, methodResult); + return CompletableFuture.completedFuture(methodResult); + } + + // d. 等待状态回调(异步) + CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); + if (stateCallback != null) { + return waitForCallbackAsync(stateCallback, context) + .thenApply(stateResult -> { + instruction.onComplete(context, stateResult); + return stateResult; + }); + } + + // 没有状态回调,直接成功 + InstructionResult result = InstructionResult.success(); + instruction.onComplete(context, result); + return CompletableFuture.completedFuture(result); + }); } - // d. 等待状态回调 + // 没有方法回调,检查是否有状态回调 CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); if (stateCallback != null) { - InstructionResult stateResult = waitForCallback(stateCallback, context); - // 注意:这里不立即返回,而是将结果传递给上层,由上层决定下一步 - instruction.onComplete(context, stateResult); - return stateResult; + return waitForCallbackAsync(stateCallback, context) + .thenApply(stateResult -> { + instruction.onComplete(context, stateResult); + return stateResult; + }); } - // 指令执行成功 + // 没有任何回调,直接成功 InstructionResult result = InstructionResult.success(); instruction.onComplete(context, result); - return result; - - } catch (Exception e) { - log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e); - InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage()); - instruction.onComplete(context, result); - return result; - } + return CompletableFuture.completedFuture(result); + }); } /** - * 等待回调 + * 异步等待回调(不阻塞线程) + * + * 关键改进: + * 1. 不使用 future.get() 阻塞线程 + * 2. 使用 ScheduledExecutorService 实现超时 + * 3. 完全基于回调机制 + * + * @param callbackConfig 回调配置 + * @param context 执行上下文 + * @return CompletableFuture */ - private InstructionResult waitForCallback(CallbackConfig callbackConfig, InstructionContext context) { + private CompletableFuture waitForCallbackAsync( + CallbackConfig callbackConfig, + InstructionContext context) { + CompletableFuture future = new CompletableFuture<>(); AtomicBoolean callbackReceived = new AtomicBoolean(false); // 注册回调 String callbackId = callbackRegistry.registerCallback( - callbackConfig.getTopic(), - messageBody -> { - if (callbackReceived.get()) { - return; // 已经收到回调,忽略后续消息 - } - + callbackConfig.getTopic(), + messageBody -> { + // 使用 CAS 确保只处理一次 + if (callbackReceived.compareAndSet(false, true)) { // 判断消息是否匹配 if (callbackConfig.matches(messageBody)) { - callbackReceived.set(true); future.complete(InstructionResult.success(messageBody)); log.debug("收到匹配的回调消息: topic={}", callbackConfig.getTopic()); } - }, - callbackConfig.getTimeoutMs() + } + }, + callbackConfig.getTimeoutMs() ); - try { - // 等待回调或超时 - return future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - log.warn("等待回调超时: topic={}, timeout={}ms", callbackConfig.getTopic(), callbackConfig.getTimeoutMs()); - return InstructionResult.timeout(); - } finally { - // 取消注册回调 + // 设置超时(不阻塞线程) + timeoutScheduler.schedule(() -> { + // 使用 CAS 确保只处理一次 + if (callbackReceived.compareAndSet(false, true)) { + future.complete(InstructionResult.timeout()); + log.warn("等待回调超时: topic={}, timeout={}ms", + callbackConfig.getTopic(), callbackConfig.getTimeoutMs()); + } + }, callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); + + // 清理回调(无论成功还是超时) + return future.whenComplete((result, throwable) -> { callbackRegistry.unregisterCallback(callbackId); - } + }); } } diff --git a/src/main/java/com/tuoheng/machine/config/ExecutorConfig.java b/src/main/java/com/tuoheng/machine/config/ExecutorConfig.java new file mode 100644 index 0000000..854dc7b --- /dev/null +++ b/src/main/java/com/tuoheng/machine/config/ExecutorConfig.java @@ -0,0 +1,72 @@ +package com.tuoheng.machine.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 线程池配置 + * 用于命令执行的异步任务 + */ +@Slf4j +@Configuration +public class ExecutorConfig { + + /** + * 命令执行线程池 + * + * 设计说明: + * 1. 核心线程数:CPU 核心数 * 2(适合 I/O 密集型任务) + * 2. 最大线程数:200(控制最大并发,防止线程爆炸) + * 3. 队列容量:10000(缓冲等待执行的任务) + * 4. 拒绝策略:CallerRunsPolicy(背压机制,让调用者线程执行) + * + * 性能预估: + * - 假设每个命令平均执行 10 秒 + * - 200 个线程可以同时处理 200 个命令 + * - 队列可以缓冲 10000 个命令 + * - 总容量:10200 个并发命令 + * - 吞吐量:200 / 10 = 20 个命令/秒 + */ + @Bean(name = "commandExecutor") + public Executor commandExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 核心线程数:根据 CPU 核心数设置 + int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; + executor.setCorePoolSize(corePoolSize); + log.info("命令执行线程池核心线程数: {}", corePoolSize); + + // 最大线程数:限制最大并发,防止线程爆炸 + executor.setMaxPoolSize(200); + + // 队列容量:缓冲等待执行的任务 + executor.setQueueCapacity(10000); + + // 拒绝策略:队列满时,调用者线程执行(背压机制) + // 这样可以防止任务丢失,同时给系统施加背压 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 线程名称前缀(方便日志追踪) + executor.setThreadNamePrefix("cmd-exec-"); + + // 等待任务完成后再关闭(优雅关闭) + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + + // 允许核心线程超时(节省资源) + executor.setAllowCoreThreadTimeOut(true); + executor.setKeepAliveSeconds(60); + + executor.initialize(); + + log.info("命令执行线程池初始化完成: corePoolSize={}, maxPoolSize={}, queueCapacity={}", + corePoolSize, 200, 10000); + + return executor; + } +} \ No newline at end of file