thingsboard-client-demo/src/main/java/com/tuoheng/machine/command/TransactionExecutor.java

277 lines
12 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.tuoheng.machine.command;
import com.tuoheng.machine.instruction.*;
import com.tuoheng.machine.mqtt.MqttCallbackRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 事务执行器(完全异步化版本)
*
* 设计说明:
* 1. 完全异步:不阻塞任何线程,所有操作都通过 CompletableFuture 链式调用
* 2. 高并发:可以同时处理数万个命令,不会创建大量线程
* 3. 资源高效:线程只在真正需要执行任务时才使用,不会浪费在等待上
*
* 性能优势:
* - 传统方式10万个命令 = 10万个阻塞线程 = 系统崩溃
* - 异步方式10万个命令 = 200个工作线程 + 10万个 CompletableFuture = 正常运行
*/
@Slf4j
@Component
public class TransactionExecutor {
private final MqttCallbackRegistry callbackRegistry;
private final Executor commandExecutor;
private final ScheduledExecutorService timeoutScheduler;
public TransactionExecutor(
MqttCallbackRegistry callbackRegistry,
@Qualifier("commandExecutor") Executor commandExecutor) {
this.callbackRegistry = callbackRegistry;
this.commandExecutor = commandExecutor;
// 创建一个专门用于超时检查的调度器(核心线程数较小)
this.timeoutScheduler = new ScheduledThreadPoolExecutor(
2,
r -> {
Thread t = new Thread(r, "timeout-scheduler");
t.setDaemon(true);
return t;
}
);
log.info("事务执行器初始化完成(完全异步模式)");
}
/**
* 执行事务(完全异步)
*
* @param transaction 事务定义
* @param context 执行上下文
* @return CompletableFuture不会阻塞调用线程
*/
public CompletableFuture<CommandResult> executeTransaction(Transaction transaction, InstructionContext context) {
log.info("开始执行事务: transaction={}, sn={}", transaction.getName(), context.getSn());
long startTime = System.currentTimeMillis();
// 直接返回异步执行的结果,不创建新线程
return executeInstructionTreeAsync(transaction, context, startTime, transaction.getRootInstruction());
}
/**
* 异步执行指令树
*
* @param transaction 事务定义
* @param context 执行上下文
* @param startTime 事务开始时间
* @param currentInstruction 当前要执行的指令
* @return CompletableFuture
*/
private CompletableFuture<CommandResult> executeInstructionTreeAsync(
Transaction transaction,
InstructionContext context,
long startTime,
Instruction currentInstruction) {
// 检查根指令
if (currentInstruction == null) {
log.error("事务没有根指令: transaction={}", transaction.getName());
return CompletableFuture.completedFuture(
CommandResult.failure(transaction.getCommandType(), "事务没有根指令")
);
}
// 检查事务是否超时
if (System.currentTimeMillis() - startTime > transaction.getTimeoutMs()) {
log.warn("事务执行超时: transaction={}, sn={}", transaction.getName(), context.getSn());
return CompletableFuture.completedFuture(
CommandResult.timeout(transaction.getCommandType())
);
}
log.debug("执行指令: instruction={}", currentInstruction.getName());
// 异步执行当前指令
Instruction finalCurrentInstruction = currentInstruction;
return executeInstructionAsync(currentInstruction, context)
.thenCompose(result -> {
// 根据执行结果获取下游指令
Instruction nextInstruction = finalCurrentInstruction.getNextInstruction(result.isSuccess());
if (nextInstruction != null) {
// 有下游指令,递归执行
log.debug("根据执行结果选择下游指令: success={}, nextInstruction={}",
result.isSuccess(), nextInstruction.getName());
return executeInstructionTreeAsync(transaction, context, startTime, nextInstruction);
} else {
// 没有下游指令,当前指令的结果就是事务的结果
if (!result.isSuccess()) {
log.error("指令执行失败(无下游指令): instruction={}, error={}",
finalCurrentInstruction.getName(), result.getErrorMessage());
return CompletableFuture.completedFuture(
CommandResult.failure(
transaction.getCommandType(),
result.getErrorMessage(),
finalCurrentInstruction.getName()
)
);
} else {
log.info("指令执行成功(无下游指令),事务完成: instruction={}, sn={}",
finalCurrentInstruction.getName(), context.getSn());
return CompletableFuture.completedFuture(
CommandResult.success(transaction.getCommandType())
);
}
}
});
}
/**
* 异步执行单个指令
*
* @param instruction 指令
* @param context 执行上下文
* @return CompletableFuture<InstructionResult>
*/
private CompletableFuture<InstructionResult> executeInstructionAsync(
Instruction instruction,
InstructionContext context) {
log.debug("开始执行指令: instruction={}, sn={}", instruction.getName(), context.getSn());
// a. 判断是否可以执行
if (!instruction.canExecute(context)) {
String error = "指令不满足执行条件";
log.warn("指令不满足执行条件: instruction={}, sn={}", instruction.getName(), context.getSn());
InstructionResult result = InstructionResult.failure(error);
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
}
// b. 在线程池中执行远程调用(避免阻塞当前线程)
return CompletableFuture.supplyAsync(() -> {
try {
instruction.executeRemoteCall(context);
log.debug("远程调用已发送: instruction={}", instruction.getName());
return true;
} catch (Exception e) {
log.error("远程调用失败: instruction={}, sn={}", instruction.getName(), context.getSn(), e);
return false;
}
}, commandExecutor).thenCompose(remoteCallSuccess -> {
if (!remoteCallSuccess) {
InstructionResult result = InstructionResult.failure("远程调用失败");
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
}
// c. 等待方法回调(异步)
CallbackConfig methodCallback = instruction.getMethodCallbackConfig(context);
if (methodCallback != null) {
return waitForCallbackAsync(methodCallback, context)
.thenCompose(methodResult -> {
if (!methodResult.isSuccess()) {
instruction.onComplete(context, methodResult);
return CompletableFuture.completedFuture(methodResult);
}
// d. 等待状态回调(异步)
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
if (stateCallback != null) {
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
}
// 没有状态回调,直接成功
InstructionResult result = InstructionResult.success();
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
});
}
// 没有方法回调,检查是否有状态回调
CallbackConfig stateCallback = instruction.getStateCallbackConfig(context);
if (stateCallback != null) {
return waitForCallbackAsync(stateCallback, context)
.thenApply(stateResult -> {
instruction.onComplete(context, stateResult);
return stateResult;
});
}
// 没有任何回调,直接成功
InstructionResult result = InstructionResult.success();
instruction.onComplete(context, result);
return CompletableFuture.completedFuture(result);
});
}
/**
* 异步等待回调(不阻塞线程)
*
* 关键改进:
* 1. 不使用 future.get() 阻塞线程
* 2. 使用 ScheduledExecutorService 实现超时
* 3. 完全基于回调机制
*
* @param callbackConfig 回调配置
* @param context 执行上下文
* @return CompletableFuture<InstructionResult>
*/
private CompletableFuture<InstructionResult> waitForCallbackAsync(
CallbackConfig callbackConfig,
InstructionContext context) {
CompletableFuture<InstructionResult> future = new CompletableFuture<>();
AtomicBoolean callbackReceived = new AtomicBoolean(false);
// 注册回调(包含 tid/bid 过滤)
String callbackId = callbackRegistry.registerCallback(
callbackConfig.getTopic(),
messageBody -> {
// 使用 CAS 确保只处理一次
if (callbackReceived.compareAndSet(false, true)) {
// 判断消息是否匹配
if (callbackConfig.matches(messageBody)) {
future.complete(InstructionResult.success(messageBody));
log.debug("收到匹配的回调消息: topic={}", callbackConfig.getTopic());
}
}
},
callbackConfig.getTimeoutMs(),
callbackConfig.getTidFieldPath(),
callbackConfig.getExpectedTid(),
callbackConfig.getBidFieldPath(),
callbackConfig.getExpectedBid()
);
// 设置超时(不阻塞线程)
timeoutScheduler.schedule(() -> {
// 使用 CAS 确保只处理一次
if (callbackReceived.compareAndSet(false, true)) {
future.complete(InstructionResult.timeout());
log.warn("等待回调超时: topic={}, timeout={}ms",
callbackConfig.getTopic(), callbackConfig.getTimeoutMs());
}
}, callbackConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
// 清理回调(无论成功还是超时)
return future.whenComplete((result, throwable) -> {
callbackRegistry.unregisterCallback(callbackId);
});
}
}