package com.tuoheng.machine.mqtt; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; /** * MQTT回调注册中心 * 用于注册和管理MQTT消息的回调处理器,他的 handleMessage 需要被真实的MQTT回调去调用 */ @Slf4j @Component public class MqttCallbackRegistry { /** * 主题 -> 回调处理器列表 */ private final Map> topicHandlers = new ConcurrentHashMap<>(); /** * 回调ID -> 回调处理器 */ private final Map handlerMap = new ConcurrentHashMap<>(); /** * 注册回调 * * @param topic 监听的主题 * @param messageHandler 消息处理器 * @param timeoutMs 超时时间(毫秒) * @return 回调ID(用于取消注册) */ public String registerCallback(String topic, Consumer messageHandler, long timeoutMs) { String callbackId = UUID.randomUUID().toString(); MqttCallbackHandler handler = new MqttCallbackHandler( callbackId, topic, messageHandler, timeoutMs, System.currentTimeMillis() ); topicHandlers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(handler); handlerMap.put(callbackId, handler); log.debug("注册MQTT回调: callbackId={}, topic={}, timeoutMs={}", callbackId, topic, timeoutMs); return callbackId; } /** * 取消注册回调 * * @param callbackId 回调ID */ public void unregisterCallback(String callbackId) { MqttCallbackHandler handler = handlerMap.remove(callbackId); if (handler != null) { CopyOnWriteArrayList handlers = topicHandlers.get(handler.getTopic()); if (handlers != null) { handlers.remove(handler); if (handlers.isEmpty()) { topicHandlers.remove(handler.getTopic()); } } log.debug("取消注册MQTT回调: callbackId={}, topic={}", callbackId, handler.getTopic()); } } /** * 处理接收到的MQTT消息 * * @param topic 主题 * @param messageBody 消息体 */ public void handleMessage(String topic, Object messageBody) { CopyOnWriteArrayList handlers = topicHandlers.get(topic); if (handlers == null || handlers.isEmpty()) { return; } log.debug("处理MQTT消息: topic={}, handlerCount={}", topic, handlers.size()); for (MqttCallbackHandler handler : handlers) { try { // 检查是否超时 if (handler.isTimeout()) { log.warn("MQTT回调已超时: callbackId={}, topic={}", handler.getCallbackId(), topic); unregisterCallback(handler.getCallbackId()); continue; } // 执行回调 handler.getMessageHandler().accept(messageBody); } catch (Exception e) { log.error("执行MQTT回调失败: callbackId={}, topic={}", handler.getCallbackId(), topic, e); } } } /** * 清理超时的回调 */ public void cleanupTimeoutCallbacks() { handlerMap.values().removeIf(handler -> { if (handler.isTimeout()) { log.warn("清理超时的MQTT回调: callbackId={}, topic={}", handler.getCallbackId(), handler.getTopic()); unregisterCallback(handler.getCallbackId()); return true; } return false; }); } /** * 获取当前注册的回调数量 */ public int getCallbackCount() { return handlerMap.size(); } }