package com.tuoheng.machine.command; import com.tuoheng.machine.instruction.*; import com.tuoheng.machine.mqtt.MqttCallbackRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * 事务执行器(支持条件分支) */ @Slf4j @Component public class TransactionExecutor { private final MqttCallbackRegistry callbackRegistry; public TransactionExecutor(MqttCallbackRegistry callbackRegistry) { this.callbackRegistry = callbackRegistry; } /** * 执行事务 */ public CompletableFuture executeTransaction(Transaction transaction, InstructionContext context) { log.info("开始执行事务: transaction={}, sn={}", transaction.getName(), context.getSn()); CompletableFuture future = new CompletableFuture<>(); long startTime = System.currentTimeMillis(); // 在新线程中执行事务 CompletableFuture.runAsync(() -> { try { // 判断使用图模式还是线性模式 if (transaction.isGraphMode()) { // 新版本:图结构执行(支持条件分支) executeGraphTransaction(transaction, context, startTime, future); } else { // 旧版本:线性执行(兼容旧代码) executeLinearTransaction(transaction, context, startTime, future); } } catch (Exception e) { log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e); future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage())); } }); return future; } /** * 执行图结构事务(新版本,支持条件分支) */ private void executeGraphTransaction(Transaction transaction, InstructionContext context, long startTime, CompletableFuture future) { // 从起始节点开始执行 InstructionNode currentNode = transaction.getStartNode(); if (currentNode == null) { log.error("事务没有起始节点: transaction={}", transaction.getName()); future.complete(CommandResult.failure(transaction.getCommandType(), "事务没有起始节点")); return; } // 循环执行节点,直到没有下一个节点 while (currentNode != null) { // 检查事务是否超时 if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.timeout(transaction.getCommandType())); return; } Instruction instruction = currentNode.getInstruction(); log.debug("执行指令节点: nodeId={}, instruction={}", currentNode.getNodeId(), instruction.getName()); // 执行指令 InstructionResult result = executeInstruction(instruction, context); // 判断是否需要根据状态回调结果决定下一步 boolean shouldBranch = shouldBranchByStateCallback(instruction, context); if (shouldBranch) { // 根据状态回调结果决定下一步 String nextNodeId = currentNode.getNextNodeId(result.isSuccess()); if (nextNodeId == null) { // 找不到下一步指令,将状态回调结果作为事务结果返回 log.info("状态回调完成,无下一步指令,事务结束: success={}, nodeId={}", result.isSuccess(), currentNode.getNodeId()); if (result.isSuccess()) { future.complete(CommandResult.success(transaction.getCommandType(), result.getData())); } else { future.complete(CommandResult.failure( transaction.getCommandType(), result.getErrorMessage(), instruction.getName() )); } return; } // 获取下一个节点 currentNode = transaction.getNode(nextNodeId); log.debug("根据状态回调结果选择下一个节点: success={}, nextNodeId={}", result.isSuccess(), nextNodeId); } else { // 不需要分支,按照传统逻辑处理 if (!result.isSuccess()) { // 指令失败,检查是否有失败分支 String nextNodeId = currentNode.getNextNodeId(false); if (nextNodeId != null) { currentNode = transaction.getNode(nextNodeId); log.debug("指令失败,跳转到失败分支: nextNodeId={}", nextNodeId); } else { // 没有失败分支,终止事务 log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage()); future.complete(CommandResult.failure( transaction.getCommandType(), result.getErrorMessage(), instruction.getName() )); return; } } else { // 指令成功,获取下一个节点 String nextNodeId = currentNode.getNextNodeId(true); if (nextNodeId != null) { currentNode = transaction.getNode(nextNodeId); log.debug("指令成功,继续下一个节点: nextNodeId={}", nextNodeId); } else { // 没有下一个节点,事务成功完成 log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.success(transaction.getCommandType())); return; } } } } // 正常结束(虽然理论上不会走到这里) log.info("事务执行完成: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.success(transaction.getCommandType())); } /** * 执行线性事务(旧版本,兼容旧代码) */ private void executeLinearTransaction(Transaction transaction, InstructionContext context, long startTime, CompletableFuture future) { // 依次执行每个指令 for (Instruction instruction : transaction.getInstructions()) { // 检查事务是否超时 if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.timeout(transaction.getCommandType())); return; } // 执行指令 InstructionResult result = executeInstruction(instruction, context); // 如果指令失败,终止事务 if (!result.isSuccess()) { log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage()); future.complete(CommandResult.failure( transaction.getCommandType(), result.getErrorMessage(), instruction.getName() )); return; } log.debug("指令执行成功: instruction={}", instruction.getName()); } // 所有指令执行成功 log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.success(transaction.getCommandType())); } /** * 判断是否需要根据状态回调结果决定下一步 * 条件:配置了 stateCallback 且 canShortCircuit 为 false */ private boolean shouldBranchByStateCallback(Instruction instruction, InstructionContext context) { CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); return stateCallback != null && !stateCallback.isCanShortCircuit(); } /** * 执行单个指令 */ private InstructionResult executeInstruction(Instruction instruction, InstructionContext context) { log.debug("开始执行指令: instruction={}, sn={}", instruction.getName(), context.getSn()); try { // a. 判断是否可以执行 if (!instruction.canExecute(context)) { String error = "指令不满足执行条件"; log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn()); InstructionResult result = InstructionResult.failure(error); instruction.onComplete(context, result); return result; } // b. 执行远程调用 instruction.executeRemoteCall(context); log.debug("远程调用已发送: instruction={}", instruction.getName()); // c. 等待方法回调 CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); if (methodCallback != null && !methodCallback.isCanShortCircuit()) { InstructionResult methodResult = waitForCallback(methodCallback, context); if (!methodResult.isSuccess()) { instruction.onComplete(context, methodResult); return methodResult; } } // d. 等待状态回调 CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); if (stateCallback != null && !stateCallback.isCanShortCircuit()) { InstructionResult stateResult = waitForCallback(stateCallback, context); // 注意:这里不立即返回,而是将结果传递给上层,由上层决定下一步 instruction.onComplete(context, stateResult); return stateResult; } // 指令执行成功 InstructionResult result = InstructionResult.success(); instruction.onComplete(context, result); return result; } catch (Exception e) { log.error("指令执行异常: instruction={}, sn=", instruction.getName(), context.getSn(), e); InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage()); instruction.onComplete(context, result); return result; } } /** * 等待回调 */ private InstructionResult waitForCallback(CallbackConfig callbackConfig, InstructionContext context) { CompletableFuture future = new CompletableFuture<>(); AtomicBoolean callbackReceived = new AtomicBoolean(false); // 注册回调 String callbackId = callbackRegistry.registerCallback( callbackConfig.getTopic(), messageBody -> { if (callbackReceived.get()) { return; // 已经收到回调,忽略后续消息 } // 判断消息是否匹配 if (callbackConfig.matches(messageBody)) { callbackReceived.set(true); future.complete(InstructionResult.success(messageBody)); log.debug("收到匹配的回调消息: topic={}", callbackConfig.getTopic()); } }, callbackConfig.getTimeoutMs() ); try { // 等待回调或超时 InstructionResult result = future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); return result; } catch (Exception e) { log.warn("等待回调超时: topic={}, timeout={}ms", callbackConfig.getTopic(), callbackConfig.getTimeoutMs()); return InstructionResult.timeout(); } finally { // 取消注册回调 callbackRegistry.unregisterCallback(callbackId); } } }