修改代码

This commit is contained in:
孙小云 2026-01-17 15:22:02 +08:00
parent ec4985d819
commit ac95f0a163
4 changed files with 345 additions and 53 deletions

View File

@ -20,6 +20,23 @@ public interface IThingsBoardDomain {
*/ */
Iterable<List<DeviceInfo>> getAllDevices(); Iterable<List<DeviceInfo>> getAllDevices();
/**
* 获取所有网关设备的迭代器
* 每次迭代返回一页网关设备列表
* 只返回 additionalInfo.gateway = true 的设备
*
* @return 网关设备迭代器
*/
Iterable<List<DeviceInfo>> getAllGatewayDevices();
/**
* 根据设备ID获取设备信息
*
* @param deviceId 设备ID
* @return 设备信息如果设备不存在则返回 null
*/
DeviceInfo getDeviceInfo(String deviceId);
/** /**
* 根据设备ID获取设备的所有属性 * 根据设备ID获取设备的所有属性
* 只返回已注册的属性键对应的数据未注册的键会被忽略 * 只返回已注册的属性键对应的数据未注册的键会被忽略
@ -64,4 +81,13 @@ public interface IThingsBoardDomain {
* @return 网关设备ID如果设备不属于任何网关则返回 null * @return 网关设备ID如果设备不属于任何网关则返回 null
*/ */
String getDeviceGatewayId(String deviceId); String getDeviceGatewayId(String deviceId);
/**
* 获取网关设备的所有子设备ID列表
* 通过 ThingsBoard EntityRelation 查询网关的 "Contains" 关系
*
* @param gatewayDeviceId 网关设备ID
* @return 子设备ID列表如果网关没有子设备则返回空列表
*/
List<String> getGatewayChildDevices(String gatewayDeviceId);
} }

View File

@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.rest.client.RestClient; import org.thingsboard.rest.client.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ -18,6 +19,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -51,6 +53,34 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain {
return new DeviceIterator(client, pageSize); return new DeviceIterator(client, pageSize);
} }
@Override
public Iterable<List<DeviceInfo>> getAllGatewayDevices() {
return new GatewayDeviceIterator(client, pageSize);
}
@Override
public DeviceInfo getDeviceInfo(String deviceId) {
try {
DeviceId id = new DeviceId(UUID.fromString(deviceId));
Optional<Device> deviceOptional = client.getDeviceById(id);
if (deviceOptional.isEmpty()) {
log.warn("设备不存在: deviceId={}", deviceId);
return null;
}
return new DeviceInfo(
deviceOptional.get().getName(),
deviceOptional.get().getId().getId().toString(),
deviceOptional.get().getType(),
deviceOptional.get().getId(),
deviceOptional.get().getAdditionalInfo()
);
} catch (Exception e) {
log.error("获取设备信息失败: deviceId={}, error={}", deviceId, e.getMessage(), e);
return null;
}
}
@Override @Override
public AttributeMap getDeviceAttributes(String deviceId) { public AttributeMap getDeviceAttributes(String deviceId) {
AttributeMap attributeMap = new AttributeMap(); AttributeMap attributeMap = new AttributeMap();
@ -206,6 +236,41 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain {
} }
} }
@Override
public List<String> getGatewayChildDevices(String gatewayDeviceId) {
List<String> childDeviceIds = new ArrayList<>();
try {
DeviceId gatewayId = new DeviceId(UUID.fromString(gatewayDeviceId));
// 查询从网关出发的 "Contains" 关系网关 -> 子设备
List<EntityRelation> relations = client.findByFrom(
gatewayId,
EntityRelation.CONTAINS_TYPE,
RelationTypeGroup.COMMON
);
if (relations == null || relations.isEmpty()) {
log.debug("网关 {} 没有子设备", gatewayDeviceId);
return childDeviceIds;
}
// 提取所有子设备的ID
for (EntityRelation relation : relations) {
EntityId childEntityId = relation.getTo();
String childDeviceId = childEntityId.getId().toString();
childDeviceIds.add(childDeviceId);
}
log.debug("网关 {} 有 {} 个子设备", gatewayDeviceId, childDeviceIds.size());
return childDeviceIds;
} catch (Exception e) {
log.error("获取网关子设备失败: gatewayDeviceId={}, error={}", gatewayDeviceId, e.getMessage(), e);
return childDeviceIds;
}
}
/** /**
* 解析属性并添加到AttributeMap * 解析属性并添加到AttributeMap
* 使用延迟注册机制自动处理所有属性 * 使用延迟注册机制自动处理所有属性

View File

@ -0,0 +1,106 @@
package com.ruoyi.device.domain.model.thingsboard;
import org.thingsboard.rest.client.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
/**
* 网关设备迭代器
* 支持分页遍历所有网关设备
* 优化策略自动跳过没有网关设备的页只返回包含网关设备的批次
*
* 注意由于ThingsBoard不支持服务端过滤网关设备
* 本迭代器会在客户端过滤并自动跳过空结果页
*/
public class GatewayDeviceIterator implements Iterable<List<DeviceInfo>> {
private final RestClient client;
private final int pageSize;
public GatewayDeviceIterator(RestClient client, int pageSize) {
this.client = client;
this.pageSize = pageSize;
}
@Override
public Iterator<List<DeviceInfo>> iterator() {
return new Iterator<List<DeviceInfo>>() {
private PageLink pageLink = new PageLink(pageSize);
private List<DeviceInfo> nextBatch = null;
private boolean hasMorePages = true;
@Override
public boolean hasNext() {
// 如果已经有预加载的数据直接返回true
if (nextBatch != null && !nextBatch.isEmpty()) {
return true;
}
// 尝试加载下一批网关设备
loadNextBatch();
// 检查是否成功加载到数据
return nextBatch != null && !nextBatch.isEmpty();
}
@Override
public List<DeviceInfo> next() {
if (!hasNext()) {
throw new NoSuchElementException("No more gateway devices");
}
// 返回预加载的数据
List<DeviceInfo> result = nextBatch;
nextBatch = null; // 清空下次调用hasNext时会重新加载
return result;
}
/**
* 加载下一批网关设备
* 会自动跳过没有网关设备的页直到找到至少一个网关设备或没有更多页
*/
private void loadNextBatch() {
nextBatch = null;
// 循环直到找到至少一个网关设备或者没有更多页
while (hasMorePages && (nextBatch == null || nextBatch.isEmpty())) {
// 获取当前页数据
PageData<Device> currentPage = client.getTenantDevices("", pageLink);
// 转换为DeviceInfo列表并过滤出网关设备
nextBatch = currentPage.getData().stream()
.filter(device -> {
// 检查 additionalInfo 中的 gateway 字段
if (device.getAdditionalInfo() == null) {
return false;
}
return device.getAdditionalInfo().has("gateway")
&& device.getAdditionalInfo().get("gateway").asBoolean();
})
.map(device -> new DeviceInfo(
device.getName(),
device.getId().getId().toString(),
device.getType(),
device.getId(),
device.getAdditionalInfo()
))
.collect(Collectors.toList());
// 准备下一页
if (currentPage.hasNext()) {
pageLink = pageLink.nextPageLink();
} else {
hasMorePages = false;
}
}
}
};
}
}

View File

@ -22,7 +22,9 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -117,11 +119,62 @@ public class SynService {
return false; return false;
} }
/**
* 建立设备到网关的映射关系
* 优化策略先识别所有网关设备然后为每个网关查询其子设备
*
* @return 设备ID到网关ID的映射 Map<子设备ID, 网关ID>
*/
private Map<String, String> buildDeviceToGatewayMap() {
Map<String, String> deviceToGatewayMap = new HashMap<>();
try {
// 获取所有设备
Iterable<List<DeviceInfo>> allDevices = iThingsBoardDomain.getAllDevices();
List<DeviceInfo> gatewayDevices = new ArrayList<>();
// 第一步识别所有网关设备
for (List<DeviceInfo> deviceBatch : allDevices) {
for (DeviceInfo deviceInfo : deviceBatch) {
if (deviceInfo.isGateway()) {
gatewayDevices.add(deviceInfo);
}
}
}
log.info("发现 {} 个网关设备", gatewayDevices.size());
// 第二步为每个网关查询其子设备建立映射关系
for (DeviceInfo gateway : gatewayDevices) {
String gatewayId = gateway.getId();
List<String> childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayId);
for (String childDeviceId : childDeviceIds) {
deviceToGatewayMap.put(childDeviceId, gatewayId);
}
log.debug("网关 {} 有 {} 个子设备", gateway.getName(), childDeviceIds.size());
}
log.info("成功建立设备到网关的映射关系,共 {} 个子设备", deviceToGatewayMap.size());
} catch (Exception e) {
log.error("建立设备到网关映射关系失败: {}", e.getMessage(), e);
}
return deviceToGatewayMap;
}
/** /**
* 定时任务同步基础表数据 * 定时任务同步基础表数据
* 执行时间启动后1分钟开始每2分钟执行一次可通过配置文件修改 * 执行时间启动后1分钟开始每2分钟执行一次可通过配置文件修改
* 配置项device.schedule.print-devices.initial-delay 初始延迟时间毫秒 * 配置项device.schedule.print-devices.initial-delay 初始延迟时间毫秒
* device.schedule.print-devices.fixed-delay 执行间隔时间毫秒 * device.schedule.print-devices.fixed-delay 执行间隔时间毫秒
*
* 优化策略
* 1. 先识别所有网关设备
* 2. 遍历每个网关同步网关及其子设备
* 3. 同步非网关的独立设备机场和无人机
*/ */
@Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}", @Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}",
fixedDelayString = "${device.schedule.update-devices.fixed-delay:120000}") fixedDelayString = "${device.schedule.update-devices.fixed-delay:120000}")
@ -129,66 +182,112 @@ public class SynService {
try { try {
log.info("========== 开始执行定时任务:同步基础表数据 =========="); log.info("========== 开始执行定时任务:同步基础表数据 ==========");
Iterable<List<DeviceInfo>> allDevices = iThingsBoardDomain.getAllDevices();
int totalCount = 0; int totalCount = 0;
int skippedCount = 0;
for (List<DeviceInfo> deviceBatch : allDevices) { // 第一步直接获取所有网关设备服务端过滤
for (DeviceInfo deviceInfo : deviceBatch) { Iterable<List<DeviceInfo>> gatewayDevices = iThingsBoardDomain.getAllGatewayDevices();
// 通过设备名字,过滤出不需要同步的一些设备,比如
// 设备名字是 THJSQ03A2302FAWQZBJ1 THJSQ03A23045BLPGYC5
// 支持配置简单的正则表达式;该正则表达式需要维护在配置文件中
// 检查设备名称是否应该被过滤
if (shouldExcludeDevice(deviceInfo.getName())) {
log.info("设备 {} 匹配过滤规则,跳过同步", deviceInfo.getName());
continue;
}
// 第二步遍历每个网关同步网关及其子设备
for (List<DeviceInfo> gatewayBatch : gatewayDevices) {
for (DeviceInfo gatewayInfo : gatewayBatch) {
try { try {
// 获取设备属性 // 按照名字过滤网关设备
AttributeMap attributes = iThingsBoardDomain.getDeviceAttributes(deviceInfo.getId()); if (shouldExcludeDevice(gatewayInfo.getName())) {
log.info("网关 {} 匹配过滤规则,跳过同步", gatewayInfo.getName());
// 判断设备类型 skippedCount++;
DeviceType deviceType = determineDeviceType(deviceInfo, attributes); continue;
// 同步设备表
Long deviceId = syncDevice(deviceInfo, attributes, deviceType);
// 根据设备类型进行不同的处理
if (deviceType == DeviceType.DOCK) {
// 机场同步机场表
syncDock(deviceId, deviceInfo.getName());
// 获取机场挂载的无人机SN号
Optional<String> subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN);
if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) {
String aircraftSn = subDeviceSnOpt.get();
// 通过SN号查找无人机设备
Device aircraftDevice = findDeviceBySn(aircraftSn);
if (aircraftDevice != null) {
// 同步机场无人机关联
syncDockAircraft(deviceId, aircraftDevice.getDeviceId());
}
}
} else if (deviceType == DeviceType.AIRCRAFT) {
// 无人机同步无人机表
syncAircraft(deviceId, deviceInfo.getName());
} }
// 网关类型不需要额外处理
// 同步网关设备本身
log.info("开始同步网关: {}", gatewayInfo.getName());
syncDevice(gatewayInfo, DeviceType.GATEWAY, null);
totalCount++; totalCount++;
// 获取该网关的所有子设备ID
List<String> childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayInfo.getId());
log.info("网关 {} 有 {} 个子设备", gatewayInfo.getName(), childDeviceIds.size());
// 遍历该网关的子设备
for (String childDeviceId : childDeviceIds) {
// 通过API获取子设备信息
DeviceInfo childDeviceInfo = iThingsBoardDomain.getDeviceInfo(childDeviceId);
if (childDeviceInfo == null) {
log.warn("子设备 {} 不存在,跳过", childDeviceId);
skippedCount++;
continue;
}
// 按照名字过滤子设备
if (shouldExcludeDevice(childDeviceInfo.getName())) {
log.info("子设备 {} 匹配过滤规则,跳过同步", childDeviceInfo.getName());
skippedCount++;
continue;
}
try {
// 获取子设备属性
AttributeMap attributes = iThingsBoardDomain.getDeviceAttributes(childDeviceId);
// 判断设备类型
DeviceType deviceType = determineDeviceType(childDeviceInfo, attributes);
// 同步子设备传入网关ID
Long deviceId = syncDevice(childDeviceInfo, deviceType, gatewayInfo.getId());
// 根据设备类型进行不同的处理
syncDeviceByType(deviceId, childDeviceInfo.getName(), deviceType, attributes);
totalCount++;
} catch (Exception e) {
log.error("同步网关子设备失败: gatewayName={}, childDeviceId={}, error={}",
gatewayInfo.getName(), childDeviceId, e.getMessage(), e);
}
}
} catch (Exception e) { } catch (Exception e) {
log.error("同步设备失败: deviceId={}, deviceName={}, error={}", log.error("同步网关失败: gatewayName={}, error={}",
deviceInfo.getId(), deviceInfo.getName(), e.getMessage(), e); gatewayInfo.getName(), e.getMessage(), e);
} }
} }
} }
log.info("========== 数据同步任务完成,共同步 {} 个设备 ==========", totalCount); log.info("========== 数据同步任务完成,共同步 {} 个设备,跳过 {} 个设备 ==========", totalCount, skippedCount);
} catch (Exception e) { } catch (Exception e) {
log.error("数据同步任务执行失败: {}", e.getMessage(), e); log.error("数据同步任务执行失败: {}", e.getMessage(), e);
} }
} }
/**
* 根据设备类型进行不同的同步处理
*
* @param deviceId 设备主键ID
* @param deviceName 设备名称
* @param deviceType 设备类型
* @param attributes 设备属性
*/
private void syncDeviceByType(Long deviceId, String deviceName, DeviceType deviceType, AttributeMap attributes) {
if (deviceType == DeviceType.DOCK) {
// 机场同步机场表
syncDock(deviceId, deviceName);
// 获取机场挂载的无人机SN号
Optional<String> subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN);
if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) {
String aircraftSn = subDeviceSnOpt.get();
Device aircraftDevice = findDeviceBySn(aircraftSn);
if (aircraftDevice != null) {
syncDockAircraft(deviceId, aircraftDevice.getDeviceId());
}
}
} else if (deviceType == DeviceType.AIRCRAFT) {
// 无人机同步无人机表
syncAircraft(deviceId, deviceName);
}
// 网关类型不需要额外处理
}
/** /**
* 判断设备类型 * 判断设备类型
* 优化后的判断逻辑 * 优化后的判断逻辑
@ -225,17 +324,13 @@ public class SynService {
* 同步设备数据 * 同步设备数据
* *
* @param deviceInfo ThingsBoard设备信息 * @param deviceInfo ThingsBoard设备信息
* @param attributes 设备属性
* @param deviceType 设备类型 * @param deviceType 设备类型
* @param gatewayId 网关设备ID从映射中获取避免重复API调用
* @return 设备主键ID * @return 设备主键ID
*/ */
private Long syncDevice(DeviceInfo deviceInfo, AttributeMap attributes, DeviceType deviceType) { private Long syncDevice(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId) {
String iotDeviceId = deviceInfo.getId(); String iotDeviceId = deviceInfo.getId();
String deviceName = deviceInfo.getName(); String deviceName = deviceInfo.getName();
// 使用 ThingsBoard 标准方式获取网关设备ID通过 EntityRelation
String gateway = iThingsBoardDomain.getDeviceGatewayId(iotDeviceId);
String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的 String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的
// 查询设备是否已存在 // 查询设备是否已存在
@ -248,7 +343,7 @@ public class SynService {
newDevice.setIotDeviceId(iotDeviceId); newDevice.setIotDeviceId(iotDeviceId);
newDevice.setDeviceType(deviceType.getCode()); newDevice.setDeviceType(deviceType.getCode());
newDevice.setDeviceSn(deviceSn); newDevice.setDeviceSn(deviceSn);
newDevice.setGateway(gateway); newDevice.setGateway(gatewayId);
newDevice.setCreateBy("system"); newDevice.setCreateBy("system");
deviceDomain.insertDevice(newDevice); deviceDomain.insertDevice(newDevice);
@ -270,8 +365,8 @@ public class SynService {
existingDevice.setDeviceSn(deviceSn); existingDevice.setDeviceSn(deviceSn);
needUpdate = true; needUpdate = true;
} }
if (!Objects.equals(existingDevice.getGateway(), gateway)) { if (!Objects.equals(existingDevice.getGateway(), gatewayId)) {
existingDevice.setGateway(gateway); existingDevice.setGateway(gatewayId);
needUpdate = true; needUpdate = true;
} }