a-tuoheng-device/src/main/java/com/ruoyi/device/service/impl/SynService.java

661 lines
28 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.ruoyi.device.service.impl;
import com.ruoyi.device.domain.api.*;
import com.ruoyi.device.domain.model.*;
import com.ruoyi.device.domain.model.thingsboard.AttributeMap;
import com.ruoyi.device.domain.model.thingsboard.DeviceInfo;
import com.ruoyi.device.domain.model.thingsboard.TelemetryMap;
import com.ruoyi.device.domain.model.thingsboard.TelemetryValue;
import com.ruoyi.device.domain.model.thingsboard.attributes.psdk.PsdkDevice;
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceAttributes;
import com.ruoyi.device.domain.model.thingsboard.constants.DeviceTelemetry;
import com.ruoyi.device.api.enums.PayloadTypeEnum;
import com.ruoyi.device.service.enums.DeviceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.regex.Pattern;
@Service
public class SynService {
private static final Logger log = LoggerFactory.getLogger(SynService.class);
private final IThingsBoardDomain iThingsBoardDomain;
/**
* 设备名称过滤正则表达式列表(从配置文件读取)
* 匹配这些正则表达式的设备将被跳过,不进行同步
*/
@Value("${device.sync.exclude-patterns:}")
private String excludePatterns;
/**
* 编译后的正则表达式模式列表(延迟初始化)
*/
private List<Pattern> excludePatternList;
@Autowired
private IDeviceDomain deviceDomain;
@Autowired
private IDockDomain dockDomain;
@Autowired
private IAircraftDomain aircraftDomain;
@Autowired
private IDockAircraftDomain dockAircraftDomain;
@Autowired
private IPayloadDomain payloadDomain;
@Autowired
private IAircraftPayloadDomain aircraftPayloadDomain;
@Autowired
private IGroupDomain groupDomain;
@Autowired
private IDockGroupDomain dockGroupDomain;
public SynService(IThingsBoardDomain iThingsBoardDomain) {
this.iThingsBoardDomain = iThingsBoardDomain;
}
/**
* 初始化设备名称过滤正则表达式列表
* 延迟初始化,在第一次使用时编译正则表达式
*/
private void initExcludePatterns() {
if (excludePatternList == null) {
excludePatternList = new ArrayList<>();
if (StringUtils.hasText(excludePatterns)) {
String[] patterns = excludePatterns.split(",");
for (String pattern : patterns) {
String trimmedPattern = pattern.trim();
if (StringUtils.hasText(trimmedPattern)) {
try {
excludePatternList.add(Pattern.compile(trimmedPattern));
log.info("加载设备名称过滤规则: {}", trimmedPattern);
} catch (Exception e) {
log.error("无效的正则表达式: {}, error={}", trimmedPattern, e.getMessage());
}
}
}
}
if (excludePatternList.isEmpty()) {
log.info("未配置设备名称过滤规则");
}
}
}
/**
* 判断设备名称是否应该被过滤(跳过同步)
*
* @param deviceName 设备名称
* @return true 表示应该被过滤false 表示不过滤
*/
private boolean shouldExcludeDevice(String deviceName) {
// 延迟初始化
initExcludePatterns();
// 如果没有配置过滤规则,不过滤任何设备
if (excludePatternList.isEmpty()) {
return false;
}
// 检查设备名称是否匹配任何过滤规则
for (Pattern pattern : excludePatternList) {
if (pattern.matcher(deviceName).matches()) {
log.debug("设备 {} 匹配过滤规则 {},跳过同步", deviceName, pattern.pattern());
return true;
}
}
return false;
}
/**
* 定时任务:同步基础表数据
* 执行时间启动后1分钟开始每10分钟执行一次可通过配置文件修改
* 配置项device.schedule.print-devices.initial-delay 初始延迟时间(毫秒)
* device.schedule.print-devices.fixed-delay 执行间隔时间(毫秒)
*
* 优化策略:
* 1. 先识别所有网关设备
* 2. 遍历每个网关,同步网关及其子设备
* 3. 同步非网关的独立设备(机场和无人机)
*/
@Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}",
fixedDelayString = "${device.schedule.update-devices.fixed-delay:600000}")
public void updateDevicesScheduled() {
try {
log.info("========== 开始执行定时任务:同步基础表数据 ==========");
int totalCount = 0;
int skippedCount = 0;
// 第一步:直接获取所有网关设备(服务端过滤)
Iterable<List<DeviceInfo>> gatewayDevices = iThingsBoardDomain.getAllGatewayDevices();
// 第二步:遍历每个网关,同步网关及其子设备
for (List<DeviceInfo> gatewayBatch : gatewayDevices) {
for (DeviceInfo gatewayInfo : gatewayBatch) {
try {
// 按照名字过滤网关设备
if (shouldExcludeDevice(gatewayInfo.getName())) {
// log.info("网关 {} 匹配过滤规则,跳过同步", gatewayInfo.getName());
skippedCount++;
continue;
}
// 同步网关设备本身
// log.info("开始同步网关: {}", gatewayInfo.getName());
syncDevice(gatewayInfo, DeviceType.GATEWAY, null);
totalCount++;
// 获取该网关的所有子设备ID
List<String> childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayInfo.getId());
// log.info("网关 {} 有 {} 个子设备", gatewayInfo.getName(), childDeviceIds.size());
// 遍历该网关的子设备
for (String childDeviceId : childDeviceIds) {
// 通过API获取子设备信息
DeviceInfo childDeviceInfo = iThingsBoardDomain.getDeviceInfo(childDeviceId);
if (childDeviceInfo == null) {
log.warn("子设备 {} 不存在,跳过", childDeviceId);
skippedCount++;
continue;
}
// 按照名字过滤子设备
if (shouldExcludeDevice(childDeviceInfo.getName())) {
// log.info("子设备 {} 匹配过滤规则,跳过同步", childDeviceInfo.getName());
skippedCount++;
continue;
}
try {
// 获取子设备属性
AttributeMap attributes = iThingsBoardDomain.getPredefinedDeviceAttributes(childDeviceId);
// 判断设备类型
DeviceType deviceType = determineDeviceType(childDeviceInfo, attributes);
// 同步子设备传入网关ID
Long deviceId = syncDevice(childDeviceInfo, deviceType, gatewayInfo.getId());
// 根据设备类型进行不同的处理
syncDeviceByType(deviceId, childDeviceInfo.getName(), deviceType, attributes);
totalCount++;
} catch (Exception e) {
log.error("同步网关子设备失败: gatewayName={}, childDeviceId={}, error={}",
gatewayInfo.getName(), childDeviceId, e.getMessage(), e);
}
}
} catch (Exception e) {
log.error("同步网关失败: gatewayName={}, error={}",
gatewayInfo.getName(), e.getMessage(), e);
}
}
}
log.info("========== 数据同步任务完成,共同步 {} 个设备,跳过 {} 个设备 ==========", totalCount, skippedCount);
} catch (Exception e) {
log.error("数据同步任务执行失败: {}", e.getMessage(), e);
}
}
/**
* 根据设备类型进行不同的同步处理
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
* @param deviceType 设备类型
* @param attributes 设备属性
*/
private void syncDeviceByType(Long deviceId, String deviceName, DeviceType deviceType, AttributeMap attributes) {
if (deviceType == DeviceType.DOCK) {
// 机场:同步机场表
syncDock(deviceId, deviceName);
// 获取机场挂载的无人机SN号
Optional<String> subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN);
log.info("机场 {} 的 sub_device.device_sn 属性: {}", deviceName,
subDeviceSnOpt.isPresent() ? subDeviceSnOpt.get() : "不存在或为空");
if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) {
String aircraftSn = subDeviceSnOpt.get();
log.info("机场 {} 尝试查找无人机: aircraftSn={}", deviceName, aircraftSn);
Device aircraftDevice = findDeviceBySn(aircraftSn);
if (aircraftDevice != null) {
log.info("找到无人机设备: aircraftSn={}, deviceId={}", aircraftSn, aircraftDevice.getDeviceId());
syncDockAircraft(deviceId, aircraftDevice.getDeviceId());
} else {
log.warn("未找到无人机设备: aircraftSn={}, 可能无人机尚未同步", aircraftSn);
}
} else {
log.warn("机场 {} 没有 sub_device.device_sn 属性,无法建立机场-无人机关联", deviceName);
}
} else if (deviceType == DeviceType.AIRCRAFT) {
// 无人机:同步无人机表
syncAircraft(deviceId, deviceName);
}
// 网关类型不需要额外处理
}
/**
* 判断设备类型
* 优化后的判断逻辑:
* 1. 优先使用 ThingsBoard 标准的 additionalInfo.gateway 字段判断网关
* 2. 对于非网关设备,通过 dock_sn 属性区分机场和无人机
*
* @param deviceInfo ThingsBoard设备信息
* @param attributes 设备属性
* @return 设备类型
*/
private DeviceType determineDeviceType(DeviceInfo deviceInfo, AttributeMap attributes) {
String deviceName = deviceInfo.getName();
// 1. 使用 ThingsBoard 标准方式判断网关:检查 additionalInfo 中的 gateway 字段
if (deviceInfo.isGateway()) {
return DeviceType.GATEWAY;
}
// 2. 非网关设备:通过 dock_sn 属性区分机场和无人机
Optional<String> dockSnOpt = attributes.get(DeviceAttributes.DOCK_SN);
if (dockSnOpt.isPresent() && StringUtils.hasText(dockSnOpt.get())) {
String dockSn = dockSnOpt.get();
// dock_sn 等于设备名称 -> 机场
// dock_sn 不等于设备名称 -> 无人机(挂载在该机场下)
return deviceName.equals(dockSn) ? DeviceType.DOCK : DeviceType.AIRCRAFT;
}
// dock_sn 属性不存在或为空,无法判断设备类型
throw new IllegalStateException("无法确定设备类型:设备 " + deviceName + " 缺少 dock_sn 属性");
}
/**
* 同步设备数据
*
* @param deviceInfo ThingsBoard设备信息
* @param deviceType 设备类型
* @param gatewayId 网关设备ID从映射中获取避免重复API调用
* @return 设备主键ID
*/
private Long syncDevice(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId) {
String iotDeviceId = deviceInfo.getId();
String deviceName = deviceInfo.getName();
String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的
log.info("开始同步设备: iotDeviceId={}, deviceName={}, deviceType={}, gatewayId={}",
iotDeviceId, deviceName, deviceType, gatewayId);
// 查询设备是否已存在
Device existingDevice = deviceDomain.selectDeviceByIotDeviceId(iotDeviceId);
if (existingDevice == null) {
// 设备不存在,插入新设备
Device newDevice = new Device();
newDevice.setDeviceName(deviceName);
newDevice.setIotDeviceId(iotDeviceId);
newDevice.setDeviceType(deviceType.getCode());
newDevice.setDeviceSn(deviceSn);
newDevice.setGateway(gatewayId);
newDevice.setCreateBy("system");
log.info("准备插入新设备: deviceName={}, deviceType={}, deviceSn={}",
deviceName, deviceType, deviceSn);
deviceDomain.insertDevice(newDevice);
Long deviceId = newDevice.getDeviceId();
log.info("插入新设备成功: iotDeviceId={}, deviceName={}, deviceType={}, 返回deviceId={}",
iotDeviceId, deviceName, deviceType, deviceId);
return deviceId;
} else {
// 设备已存在,检查是否需要更新
boolean needUpdate = false;
if (!Objects.equals(existingDevice.getDeviceName(), deviceName)) {
existingDevice.setDeviceName(deviceName);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getDeviceType(), deviceType.getCode())) {
existingDevice.setDeviceType(deviceType.getCode());
needUpdate = true;
}
if (!Objects.equals(existingDevice.getDeviceSn(), deviceSn)) {
existingDevice.setDeviceSn(deviceSn);
needUpdate = true;
}
if (!Objects.equals(existingDevice.getGateway(), gatewayId)) {
existingDevice.setGateway(gatewayId);
needUpdate = true;
}
if (needUpdate) {
existingDevice.setUpdateBy("system");
deviceDomain.updateDevice(existingDevice);
log.info("更新设备: iotDeviceId={}, deviceName={}, deviceType={}", iotDeviceId, deviceName, deviceType);
}
return existingDevice.getDeviceId();
}
}
/**
* 同步机场数据
* 逻辑:
* 1. 如果是新插入的机场需要将其插入到默认分组ID=0
* 2. 如果默认分组不存在,需要先创建默认分组
* 3. 如果是已存在的机场,检查机场是否在任何分组中,如果不在,将其插入默认分组
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
*/
private void syncDock(Long deviceId, String deviceName) {
log.info("开始同步机场: deviceId={}, deviceName={}", deviceId, deviceName);
// 查询机场是否已存在
Dock existingDock = dockDomain.selectDockByDeviceId(deviceId);
boolean isNewDock = (existingDock == null);
Long dockId;
if (isNewDock) {
// 机场不存在,插入新机场
Dock newDock = new Dock();
newDock.setDockName(deviceName);
newDock.setDeviceId(deviceId);
newDock.setCreateBy("system");
log.info("准备插入新机场: deviceId={}, dockName={}", deviceId, deviceName);
dockDomain.insertDock(newDock);
dockId = newDock.getDockId();
log.info("插入新机场成功: deviceId={}, dockName={}, dockId={}",
deviceId, deviceName, dockId);
} else {
dockId = existingDock.getDockId();
log.info("机场已存在: deviceId={}, dockName={}, dockId={}", deviceId, deviceName, dockId);
}
// 确保默认分组存在
ensureDefaultGroupExists();
// 检查机场是否在任何分组中
List<DockGroup> dockGroups = dockGroupDomain.selectDockGroupByDockId(dockId);
if (dockGroups == null || dockGroups.isEmpty()) {
// 机场不在任何分组中,将其插入默认分组
insertDockToDefaultGroup(dockId, deviceName);
} else {
log.info("机场已在分组中: dockId={}, 分组数量={}", dockId, dockGroups.size());
}
}
/**
* 同步无人机数据
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
* @return 无人机主键ID
*/
private Long syncAircraft(Long deviceId, String deviceName) {
log.info("开始同步无人机: deviceId={}, deviceName={}", deviceId, deviceName);
// 查询无人机是否已存在
Aircraft existingAircraft = aircraftDomain.selectAircraftByDeviceId(deviceId);
if (existingAircraft == null) {
// 无人机不存在,插入新无人机
Aircraft newAircraft = new Aircraft();
newAircraft.setAircraftName(deviceName);
newAircraft.setDeviceId(deviceId);
newAircraft.setCreateBy("system");
log.info("准备插入新无人机: deviceId={}, aircraftName={}", deviceId, deviceName);
aircraftDomain.insertAircraft(newAircraft);
Long aircraftId = newAircraft.getAircraftId();
log.info("插入新无人机成功: deviceId={}, aircraftName={}, aircraftId={}",
deviceId, deviceName, aircraftId);
return aircraftId;
} else {
log.info("无人机已存在,跳过插入: deviceId={}, aircraftName={}", deviceId, deviceName);
}
Device device = deviceDomain.selectDeviceByDeviceId(deviceId);
TelemetryMap telemetryMap = iThingsBoardDomain.getPredefinedDeviceTelemetry(device.getIotDeviceId());
Optional<TelemetryValue<List<PsdkDevice>>>
psdkDevicesOption = telemetryMap.get(DeviceTelemetry.PSDK_WIDGET_VALUES);
if(psdkDevicesOption.isPresent()) {
List<PsdkDevice> psdkDevices = psdkDevicesOption.get().getValue();
if(!CollectionUtils.isEmpty(psdkDevices)) {
for (PsdkDevice psdkDevice : psdkDevices) {
if(Objects.nonNull(psdkDevice.getSpeaker()) &&
Objects.nonNull(psdkDevice.getPsdk_sn()) && !psdkDevice.getPsdk_sn().isEmpty()) {
log.info("开始同步PSDK喊话器设备: psdkSn={}, psdkName={}",
psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name());
// 第一步:同步挂载设备(喊话器)
Payload payload = new Payload();
payload.setPayloadSn(psdkDevice.getPsdk_sn());
List<Payload> payloads = payloadDomain.selectPayloadList(payload);
if(CollectionUtils.isEmpty(payloads)) {
// 挂载不存在,插入新挂载
payload.setPayloadName(psdkDevice.getPsdk_name());
payload.setPayloadDisplayName("喊话器");
payload.setPayloadType(PayloadTypeEnum.SPEAKER.getCode());
payload.setCreateBy("system");
payloadDomain.insertPayload(payload);
log.info("插入新挂载设备: payloadSn={}, payloadName={}, payloadId={}",
psdkDevice.getPsdk_sn(), psdkDevice.getPsdk_name(), payload.getPayloadId());
} else {
// 挂载已存在,使用已有的挂载
payload = payloads.get(0);
log.info("挂载设备已存在: payloadSn={}, payloadId={}",
psdkDevice.getPsdk_sn(), payload.getPayloadId());
}
// 第二步:同步无人机挂载关联表
Long payloadId = payload.getPayloadId();
Long aircraftId = existingAircraft.getAircraftId();
// 获取无人机关联的机场ID
Long dockId = getDockIdByAircraftId(aircraftId);
List<AircraftPayload> aircraftPayloads =
aircraftPayloadDomain.selectAircraftPayloadByPayloadId(payloadId);
if(!CollectionUtils.isEmpty(aircraftPayloads)) {
// 关联已存在,检查是否需要更新
AircraftPayload existingRelation = aircraftPayloads.get(0);
boolean needUpdate = false;
if(!Objects.equals(existingRelation.getAircraftId(), aircraftId)) {
existingRelation.setAircraftId(aircraftId);
needUpdate = true;
}
if(!Objects.equals(existingRelation.getDockId(), dockId)) {
existingRelation.setDockId(dockId);
needUpdate = true;
}
if(needUpdate) {
existingRelation.setUpdateBy("system");
aircraftPayloadDomain.updateAircraftPayload(existingRelation);
log.info("更新无人机挂载关联: aircraftId={}, payloadId={}, dockId={}",
aircraftId, payloadId, dockId);
} else {
log.info("无人机挂载关联无需更新: aircraftId={}, payloadId={}",
aircraftId, payloadId);
}
} else {
// 关联不存在,插入新关联
AircraftPayload newRelation = new AircraftPayload();
newRelation.setAircraftId(aircraftId);
newRelation.setPayloadId(payloadId);
newRelation.setDockId(dockId);
newRelation.setCreateBy("system");
aircraftPayloadDomain.insertAircraftPayload(newRelation);
log.info("插入无人机挂载关联: aircraftId={}, payloadId={}, dockId={}",
aircraftId, payloadId, dockId);
}
}
}
}
}
// 无人机已存在,无需更新
return existingAircraft.getAircraftId();
}
/**
* 根据设备SN号查找设备
*
* @param deviceSn 设备SN号
* @return 设备信息
*/
private Device findDeviceBySn(String deviceSn) {
Device queryDevice = new Device();
queryDevice.setDeviceSn(deviceSn);
List<Device> devices = deviceDomain.selectDeviceList(queryDevice);
return (devices != null && !devices.isEmpty()) ? devices.get(0) : null;
}
/**
* 根据无人机ID获取关联的机场ID
*
* @param aircraftId 无人机主键ID
* @return 机场主键ID如果没有关联则返回null
*/
private Long getDockIdByAircraftId(Long aircraftId) {
List<DockAircraft> dockAircrafts = dockAircraftDomain.selectDockAircraftByAircraftId(aircraftId);
if (dockAircrafts != null && !dockAircrafts.isEmpty()) {
Long dockId = dockAircrafts.get(0).getDockId();
log.debug("无人机 {} 关联的机场ID: {}", aircraftId, dockId);
return dockId;
}
log.debug("无人机 {} 没有关联的机场", aircraftId);
return null;
}
/**
* 同步机场无人机关联数据
* 按照一个机场只会挂载一台无人机的逻辑进行处理
*
* @param dockDeviceId 机场设备主键ID
* @param aircraftDeviceId 无人机设备主键ID
*/
private void syncDockAircraft(Long dockDeviceId, Long aircraftDeviceId) {
log.info("开始同步机场无人机关联: dockDeviceId={}, aircraftDeviceId={}", dockDeviceId, aircraftDeviceId);
// 获取机场主键
Dock dock = dockDomain.selectDockByDeviceId(dockDeviceId);
if (dock == null) {
log.warn("机场不存在,无法同步机场无人机关联: dockDeviceId={}", dockDeviceId);
return;
}
log.info("找到机场: dockId={}, dockName={}", dock.getDockId(), dock.getDockName());
// 获取无人机主键
Aircraft aircraft = aircraftDomain.selectAircraftByDeviceId(aircraftDeviceId);
if (aircraft == null) {
log.warn("无人机不存在,无法同步机场无人机关联: aircraftDeviceId={}", aircraftDeviceId);
return;
}
Long dockId = dock.getDockId();
Long aircraftId = aircraft.getAircraftId();
// 查询该机场是否已有关联
List<DockAircraft> existingRelations = dockAircraftDomain.selectDockAircraftByDockId(dockId);
if (existingRelations == null || existingRelations.isEmpty()) {
// 机场没有关联,插入新关联
DockAircraft newRelation = new DockAircraft();
newRelation.setDockId(dockId);
newRelation.setAircraftId(aircraftId);
newRelation.setCreateBy("system");
dockAircraftDomain.insertDockAircraft(newRelation);
log.info("插入机场无人机关联: dockId={}, aircraftId={}", dockId, aircraftId);
} else {
// 机场已有关联,检查是否需要更新
DockAircraft existingRelation = existingRelations.get(0);
if (!Objects.equals(existingRelation.getAircraftId(), aircraftId)) {
// 无人机发生变化,更新关联
existingRelation.setAircraftId(aircraftId);
existingRelation.setUpdateBy("system");
dockAircraftDomain.updateDockAircraft(existingRelation);
log.info("更新机场无人机关联: dockId=, aircraftId={}", dockId, aircraftId);
}
}
}
/**
* 确保默认分组存在
* 默认分组ID为0名称为"默认分组"
* 如果不存在则创建
*/
private void ensureDefaultGroupExists() {
Long defaultGroupId = 0L;
Group defaultGroup = groupDomain.selectGroupByGroupId(defaultGroupId);
if (defaultGroup == null) {
// 默认分组不存在,创建默认分组
Group newGroup = new Group();
newGroup.setGroupName("默认分组");
newGroup.setCreateBy("system");
newGroup.setRemark("系统自动创建的默认分组,用于存放未分组的机场");
groupDomain.insertGroup(newGroup);
log.info("创建默认分组成功: groupId={}, groupName={}", defaultGroupId, "默认分组");
} else {
log.debug("默认分组已存在: groupId={}, groupName={}", defaultGroupId, defaultGroup.getGroupName());
}
}
/**
* 将机场插入到默认分组
*
* @param dockId 机场主键ID
* @param dockName 机场名称(用于日志)
*/
private void insertDockToDefaultGroup(Long dockId, String dockName) {
Long defaultGroupId = 0L;
DockGroup newDockGroup = new DockGroup();
newDockGroup.setDockId(dockId);
newDockGroup.setGroupId(defaultGroupId);
newDockGroup.setCreateBy("system");
newDockGroup.setRemark("系统自动添加到默认分组");
dockGroupDomain.insertDockGroup(newDockGroup);
log.info("将机场添加到默认分组: dockId={}, dockName={}, groupId={}",
dockId, dockName, defaultGroupId);
}
}