thingsboard-client-demo/src/main/java/com/tuoheng/machine/mqtt/MqttCallbackRegistry.java

126 lines
3.9 KiB
Java
Raw Normal View History

2025-12-17 10:23:45 +08:00
package com.tuoheng.machine.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
/**
* MQTT回调注册中心
* 用于注册和管理MQTT消息的回调处理器
*/
@Slf4j
@Component
public class MqttCallbackRegistry {
/**
* 主题 -> 回调处理器列表
*/
private final Map<String, CopyOnWriteArrayList<MqttCallbackHandler>> topicHandlers = new ConcurrentHashMap<>();
/**
* 回调ID -> 回调处理器
*/
private final Map<String, MqttCallbackHandler> handlerMap = new ConcurrentHashMap<>();
/**
* 注册回调
*
* @param topic 监听的主题
* @param messageHandler 消息处理器
* @param timeoutMs 超时时间毫秒
* @return 回调ID用于取消注册
*/
public String registerCallback(String topic, Consumer<Object> messageHandler, long timeoutMs) {
String callbackId = UUID.randomUUID().toString();
MqttCallbackHandler handler = new MqttCallbackHandler(
callbackId,
topic,
messageHandler,
timeoutMs,
System.currentTimeMillis()
);
topicHandlers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(handler);
handlerMap.put(callbackId, handler);
log.debug("注册MQTT回调: callbackId={}, topic={}, timeoutMs={}", callbackId, topic, timeoutMs);
return callbackId;
}
/**
* 取消注册回调
*
* @param callbackId 回调ID
*/
public void unregisterCallback(String callbackId) {
MqttCallbackHandler handler = handlerMap.remove(callbackId);
if (handler != null) {
CopyOnWriteArrayList<MqttCallbackHandler> handlers = topicHandlers.get(handler.getTopic());
if (handlers != null) {
handlers.remove(handler);
if (handlers.isEmpty()) {
topicHandlers.remove(handler.getTopic());
}
}
log.debug("取消注册MQTT回调: callbackId={}, topic={}", callbackId, handler.getTopic());
}
}
/**
* 处理接收到的MQTT消息
*
* @param topic 主题
* @param messageBody 消息体
*/
public void handleMessage(String topic, Object messageBody) {
CopyOnWriteArrayList<MqttCallbackHandler> handlers = topicHandlers.get(topic);
if (handlers == null || handlers.isEmpty()) {
return;
}
log.debug("处理MQTT消息: topic={}, handlerCount={}", topic, handlers.size());
for (MqttCallbackHandler handler : handlers) {
try {
// 检查是否超时
if (handler.isTimeout()) {
log.warn("MQTT回调已超时: callbackId={}, topic={}", handler.getCallbackId(), topic);
unregisterCallback(handler.getCallbackId());
continue;
}
// 执行回调
handler.getMessageHandler().accept(messageBody);
} catch (Exception e) {
log.error("执行MQTT回调失败: callbackId={}, topic={}", handler.getCallbackId(), topic, e);
}
}
}
/**
* 清理超时的回调
*/
public void cleanupTimeoutCallbacks() {
handlerMap.values().removeIf(handler -> {
if (handler.isTimeout()) {
log.warn("清理超时的MQTT回调: callbackId={}, topic={}", handler.getCallbackId(), handler.getTopic());
unregisterCallback(handler.getCallbackId());
return true;
}
return false;
});
}
/**
* 获取当前注册的回调数量
*/
public int getCallbackCount() {
return handlerMap.size();
}
}