package com.tuoheng.machine.command; import com.tuoheng.machine.instruction.*; import com.tuoheng.machine.mqtt.MqttCallbackRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** * 事务执行器 */ @Slf4j @Component public class TransactionExecutor { private final MqttCallbackRegistry callbackRegistry; public TransactionExecutor(MqttCallbackRegistry callbackRegistry) { this.callbackRegistry = callbackRegistry; } /** * 执行事务 */ public CompletableFuture executeTransaction(Transaction transaction, InstructionContext context) { log.info("开始执行事务: transaction={}, sn={}", transaction.getName(), context.getSn()); CompletableFuture future = new CompletableFuture<>(); long startTime = System.currentTimeMillis(); // 在新线程中执行事务 CompletableFuture.runAsync(() -> { try { // 依次执行每个指令 for (Instruction instruction : transaction.getInstructions()) { // 检查事务是否超时 if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) { log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.timeout(transaction.getCommandType())); return; } // 执行指令 InstructionResult result = executeInstruction(instruction, context); // 如果指令失败,终止事务 if (!result.isSuccess()) { log.error("指令执行失败: instruction={}, error={}", instruction.getName(), result.getErrorMessage()); future.complete(CommandResult.failure( transaction.getCommandType(), result.getErrorMessage(), instruction.getName() )); return; } log.debug("指令执行成功: instruction={}", instruction.getName()); } // 所有指令执行成功 log.info("事务执行成功: transaction={}, sn={}", transaction.getName(), context.getSn()); future.complete(CommandResult.success(transaction.getCommandType())); } catch (Exception e) { log.error("事务执行异常: transaction={}, sn={}", transaction.getName(), context.getSn(), e); future.complete(CommandResult.failure(transaction.getCommandType(), "事务执行异常: " + e.getMessage())); } }); return future; } /** * 执行单个指令 */ private InstructionResult executeInstruction(Instruction instruction, InstructionContext context) { log.debug("开始执行指令: instruction={}, sn={}", instruction.getName(), context.getSn()); try { // a. 判断是否可以执行 if (!instruction.canExecute(context)) { String error = "指令不满足执行条件"; log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn()); InstructionResult result = InstructionResult.failure(error); instruction.onComplete(context, result); return result; } // b. 执行远程调用 instruction.executeRemoteCall(context); log.debug("远程调用已发送: instruction={}", instruction.getName()); // c. 等待方法回调 CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context); if (methodCallback != null && !methodCallback.isCanShortCircuit()) { InstructionResult methodResult = waitForCallback(methodCallback, context); if (!methodResult.isSuccess()) { instruction.onComplete(context, methodResult); return methodResult; } } // d. 等待状态回调 CallbackConfig stateCallback = instruction.getStateCallbackConfig(context); if (stateCallback != null && !stateCallback.isCanShortCircuit()) { InstructionResult stateResult = waitForCallback(stateCallback, context); if (!stateResult.isSuccess()) { instruction.onComplete(context, stateResult); return stateResult; } } // 指令执行成功 InstructionResult result = InstructionResult.success(); instruction.onComplete(context, result); return result; } catch (Exception e) { log.error("指令执行异常: instruction={}, sn={}", instruction.getName(), context.getSn(), e); InstructionResult result = InstructionResult.failure("指令执行异常: " + e.getMessage()); instruction.onComplete(context, result); return result; } } /** * 等待回调 */ private InstructionResult waitForCallback(CallbackConfig callbackConfig, InstructionContext context) { CompletableFuture future = new CompletableFuture<>(); AtomicBoolean callbackReceived = new AtomicBoolean(false); // 注册回调 String callbackId = callbackRegistry.registerCallback( callbackConfig.getTopic(), messageBody -> { if (callbackReceived.get()) { return; // 已经收到回调,忽略后续消息 } // 判断消息是否匹配 if (callbackConfig.matches(messageBody)) { callbackReceived.set(true); future.complete(InstructionResult.success(messageBody)); log.debug("收到匹配的回调消息: topic={}", callbackConfig.getTopic()); } }, callbackConfig.getTimeoutMs() ); try { // 等待回调或超时 InstructionResult result = future.get(callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); return result; } catch (Exception e) { log.warn("等待回调超时: topic={}, timeout={}ms", callbackConfig.getTopic(), callbackConfig.getTimeoutMs()); return InstructionResult.timeout(); } finally { // 取消注册回调 callbackRegistry.unregisterCallback(callbackId); } } }