From ac95f0a163fc023140671120a2dd549cc5b96ae3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E5=B0=8F=E4=BA=91?= Date: Sat, 17 Jan 2026 15:22:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/domain/api/IThingsBoardDomain.java | 26 +++ .../domain/impl/ThingsBoardDomainImpl.java | 65 ++++++ .../thingsboard/GatewayDeviceIterator.java | 106 +++++++++ .../ruoyi/device/service/impl/SynService.java | 201 +++++++++++++----- 4 files changed, 345 insertions(+), 53 deletions(-) create mode 100644 src/main/java/com/ruoyi/device/domain/model/thingsboard/GatewayDeviceIterator.java diff --git a/src/main/java/com/ruoyi/device/domain/api/IThingsBoardDomain.java b/src/main/java/com/ruoyi/device/domain/api/IThingsBoardDomain.java index 8e97c8d..c55d119 100644 --- a/src/main/java/com/ruoyi/device/domain/api/IThingsBoardDomain.java +++ b/src/main/java/com/ruoyi/device/domain/api/IThingsBoardDomain.java @@ -20,6 +20,23 @@ public interface IThingsBoardDomain { */ Iterable> getAllDevices(); + /** + * 获取所有网关设备的迭代器 + * 每次迭代返回一页网关设备列表 + * 只返回 additionalInfo.gateway = true 的设备 + * + * @return 网关设备迭代器 + */ + Iterable> getAllGatewayDevices(); + + /** + * 根据设备ID获取设备信息 + * + * @param deviceId 设备ID + * @return 设备信息,如果设备不存在则返回 null + */ + DeviceInfo getDeviceInfo(String deviceId); + /** * 根据设备ID获取设备的所有属性 * 只返回已注册的属性键对应的数据,未注册的键会被忽略 @@ -64,4 +81,13 @@ public interface IThingsBoardDomain { * @return 网关设备ID,如果设备不属于任何网关则返回 null */ String getDeviceGatewayId(String deviceId); + + /** + * 获取网关设备的所有子设备ID列表 + * 通过 ThingsBoard 的 EntityRelation 查询网关的 "Contains" 关系 + * + * @param gatewayDeviceId 网关设备ID + * @return 子设备ID列表,如果网关没有子设备则返回空列表 + */ + List getGatewayChildDevices(String gatewayDeviceId); } \ No newline at end of file diff --git a/src/main/java/com/ruoyi/device/domain/impl/ThingsBoardDomainImpl.java b/src/main/java/com/ruoyi/device/domain/impl/ThingsBoardDomainImpl.java index 7263b23..85ec8c9 100644 --- a/src/main/java/com/ruoyi/device/domain/impl/ThingsBoardDomainImpl.java +++ b/src/main/java/com/ruoyi/device/domain/impl/ThingsBoardDomainImpl.java @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.rest.client.RestClient; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -18,6 +19,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -51,6 +53,34 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain { return new DeviceIterator(client, pageSize); } + @Override + public Iterable> getAllGatewayDevices() { + return new GatewayDeviceIterator(client, pageSize); + } + + @Override + public DeviceInfo getDeviceInfo(String deviceId) { + try { + DeviceId id = new DeviceId(UUID.fromString(deviceId)); + Optional deviceOptional = client.getDeviceById(id); + if (deviceOptional.isEmpty()) { + log.warn("设备不存在: deviceId={}", deviceId); + return null; + } + + return new DeviceInfo( + deviceOptional.get().getName(), + deviceOptional.get().getId().getId().toString(), + deviceOptional.get().getType(), + deviceOptional.get().getId(), + deviceOptional.get().getAdditionalInfo() + ); + } catch (Exception e) { + log.error("获取设备信息失败: deviceId={}, error={}", deviceId, e.getMessage(), e); + return null; + } + } + @Override public AttributeMap getDeviceAttributes(String deviceId) { AttributeMap attributeMap = new AttributeMap(); @@ -206,6 +236,41 @@ public class ThingsBoardDomainImpl implements IThingsBoardDomain { } } + @Override + public List getGatewayChildDevices(String gatewayDeviceId) { + List childDeviceIds = new ArrayList<>(); + + try { + DeviceId gatewayId = new DeviceId(UUID.fromString(gatewayDeviceId)); + + // 查询从网关出发的 "Contains" 关系(网关 -> 子设备) + List relations = client.findByFrom( + gatewayId, + EntityRelation.CONTAINS_TYPE, + RelationTypeGroup.COMMON + ); + + if (relations == null || relations.isEmpty()) { + log.debug("网关 {} 没有子设备", gatewayDeviceId); + return childDeviceIds; + } + + // 提取所有子设备的ID + for (EntityRelation relation : relations) { + EntityId childEntityId = relation.getTo(); + String childDeviceId = childEntityId.getId().toString(); + childDeviceIds.add(childDeviceId); + } + + log.debug("网关 {} 有 {} 个子设备", gatewayDeviceId, childDeviceIds.size()); + return childDeviceIds; + + } catch (Exception e) { + log.error("获取网关子设备失败: gatewayDeviceId={}, error={}", gatewayDeviceId, e.getMessage(), e); + return childDeviceIds; + } + } + /** * 解析属性并添加到AttributeMap * 使用延迟注册机制,自动处理所有属性 diff --git a/src/main/java/com/ruoyi/device/domain/model/thingsboard/GatewayDeviceIterator.java b/src/main/java/com/ruoyi/device/domain/model/thingsboard/GatewayDeviceIterator.java new file mode 100644 index 0000000..4620b4d --- /dev/null +++ b/src/main/java/com/ruoyi/device/domain/model/thingsboard/GatewayDeviceIterator.java @@ -0,0 +1,106 @@ +package com.ruoyi.device.domain.model.thingsboard; + +import org.thingsboard.rest.client.RestClient; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +/** + * 网关设备迭代器 + * 支持分页遍历所有网关设备 + * 优化策略:自动跳过没有网关设备的页,只返回包含网关设备的批次 + * + * 注意:由于ThingsBoard不支持服务端过滤网关设备, + * 本迭代器会在客户端过滤,并自动跳过空结果页 + */ +public class GatewayDeviceIterator implements Iterable> { + + private final RestClient client; + private final int pageSize; + + public GatewayDeviceIterator(RestClient client, int pageSize) { + this.client = client; + this.pageSize = pageSize; + } + + @Override + public Iterator> iterator() { + return new Iterator>() { + private PageLink pageLink = new PageLink(pageSize); + private List nextBatch = null; + private boolean hasMorePages = true; + + @Override + public boolean hasNext() { + // 如果已经有预加载的数据,直接返回true + if (nextBatch != null && !nextBatch.isEmpty()) { + return true; + } + + // 尝试加载下一批网关设备 + loadNextBatch(); + + // 检查是否成功加载到数据 + return nextBatch != null && !nextBatch.isEmpty(); + } + + @Override + public List next() { + if (!hasNext()) { + throw new NoSuchElementException("No more gateway devices"); + } + + // 返回预加载的数据 + List result = nextBatch; + nextBatch = null; // 清空,下次调用hasNext时会重新加载 + + return result; + } + + /** + * 加载下一批网关设备 + * 会自动跳过没有网关设备的页,直到找到至少一个网关设备或没有更多页 + */ + private void loadNextBatch() { + nextBatch = null; + + // 循环直到找到至少一个网关设备,或者没有更多页 + while (hasMorePages && (nextBatch == null || nextBatch.isEmpty())) { + // 获取当前页数据 + PageData currentPage = client.getTenantDevices("", pageLink); + + // 转换为DeviceInfo列表,并过滤出网关设备 + nextBatch = currentPage.getData().stream() + .filter(device -> { + // 检查 additionalInfo 中的 gateway 字段 + if (device.getAdditionalInfo() == null) { + return false; + } + return device.getAdditionalInfo().has("gateway") + && device.getAdditionalInfo().get("gateway").asBoolean(); + }) + .map(device -> new DeviceInfo( + device.getName(), + device.getId().getId().toString(), + device.getType(), + device.getId(), + device.getAdditionalInfo() + )) + .collect(Collectors.toList()); + + // 准备下一页 + if (currentPage.hasNext()) { + pageLink = pageLink.nextPageLink(); + } else { + hasMorePages = false; + } + } + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/com/ruoyi/device/service/impl/SynService.java b/src/main/java/com/ruoyi/device/service/impl/SynService.java index 5455782..7668355 100644 --- a/src/main/java/com/ruoyi/device/service/impl/SynService.java +++ b/src/main/java/com/ruoyi/device/service/impl/SynService.java @@ -22,7 +22,9 @@ import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.regex.Pattern; @@ -117,11 +119,62 @@ public class SynService { return false; } + /** + * 建立设备到网关的映射关系 + * 优化策略:先识别所有网关设备,然后为每个网关查询其子设备 + * + * @return 设备ID到网关ID的映射 Map<子设备ID, 网关ID> + */ + private Map buildDeviceToGatewayMap() { + Map deviceToGatewayMap = new HashMap<>(); + + try { + // 获取所有设备 + Iterable> allDevices = iThingsBoardDomain.getAllDevices(); + List gatewayDevices = new ArrayList<>(); + + // 第一步:识别所有网关设备 + for (List deviceBatch : allDevices) { + for (DeviceInfo deviceInfo : deviceBatch) { + if (deviceInfo.isGateway()) { + gatewayDevices.add(deviceInfo); + } + } + } + + log.info("发现 {} 个网关设备", gatewayDevices.size()); + + // 第二步:为每个网关查询其子设备,建立映射关系 + for (DeviceInfo gateway : gatewayDevices) { + String gatewayId = gateway.getId(); + List childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayId); + + for (String childDeviceId : childDeviceIds) { + deviceToGatewayMap.put(childDeviceId, gatewayId); + } + + log.debug("网关 {} 有 {} 个子设备", gateway.getName(), childDeviceIds.size()); + } + + log.info("成功建立设备到网关的映射关系,共 {} 个子设备", deviceToGatewayMap.size()); + + } catch (Exception e) { + log.error("建立设备到网关映射关系失败: {}", e.getMessage(), e); + } + + return deviceToGatewayMap; + } + /** * 定时任务:同步基础表数据 * 执行时间:启动后1分钟开始,每2分钟执行一次(可通过配置文件修改) * 配置项:device.schedule.print-devices.initial-delay 初始延迟时间(毫秒) * device.schedule.print-devices.fixed-delay 执行间隔时间(毫秒) + * + * 优化策略: + * 1. 先识别所有网关设备 + * 2. 遍历每个网关,同步网关及其子设备 + * 3. 同步非网关的独立设备(机场和无人机) */ @Scheduled(initialDelayString = "${device.schedule.update-devices.initial-delay:60000}", fixedDelayString = "${device.schedule.update-devices.fixed-delay:120000}") @@ -129,66 +182,112 @@ public class SynService { try { log.info("========== 开始执行定时任务:同步基础表数据 =========="); - Iterable> allDevices = iThingsBoardDomain.getAllDevices(); int totalCount = 0; + int skippedCount = 0; - for (List deviceBatch : allDevices) { - for (DeviceInfo deviceInfo : deviceBatch) { - // 通过设备名字,过滤出不需要同步的一些设备,比如 - // 设备名字是 THJSQ03A2302FAWQZBJ1 和 THJSQ03A23045BLPGYC5 - // 支持配置简单的正则表达式;该正则表达式需要维护在配置文件中 - - // 检查设备名称是否应该被过滤 - if (shouldExcludeDevice(deviceInfo.getName())) { - log.info("设备 {} 匹配过滤规则,跳过同步", deviceInfo.getName()); - continue; - } + // 第一步:直接获取所有网关设备(服务端过滤) + Iterable> gatewayDevices = iThingsBoardDomain.getAllGatewayDevices(); + // 第二步:遍历每个网关,同步网关及其子设备 + for (List gatewayBatch : gatewayDevices) { + for (DeviceInfo gatewayInfo : gatewayBatch) { try { - // 获取设备属性 - AttributeMap attributes = iThingsBoardDomain.getDeviceAttributes(deviceInfo.getId()); - - // 判断设备类型 - DeviceType deviceType = determineDeviceType(deviceInfo, attributes); - - // 同步设备表 - Long deviceId = syncDevice(deviceInfo, attributes, deviceType); - - // 根据设备类型进行不同的处理 - if (deviceType == DeviceType.DOCK) { - // 机场:同步机场表 - syncDock(deviceId, deviceInfo.getName()); - - // 获取机场挂载的无人机SN号 - Optional subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN); - if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) { - String aircraftSn = subDeviceSnOpt.get(); - // 通过SN号查找无人机设备 - Device aircraftDevice = findDeviceBySn(aircraftSn); - if (aircraftDevice != null) { - // 同步机场无人机关联 - syncDockAircraft(deviceId, aircraftDevice.getDeviceId()); - } - } - } else if (deviceType == DeviceType.AIRCRAFT) { - // 无人机:同步无人机表 - syncAircraft(deviceId, deviceInfo.getName()); + // 按照名字过滤网关设备 + if (shouldExcludeDevice(gatewayInfo.getName())) { + log.info("网关 {} 匹配过滤规则,跳过同步", gatewayInfo.getName()); + skippedCount++; + continue; } - // 网关类型不需要额外处理 + + // 同步网关设备本身 + log.info("开始同步网关: {}", gatewayInfo.getName()); + syncDevice(gatewayInfo, DeviceType.GATEWAY, null); totalCount++; + + // 获取该网关的所有子设备ID + List childDeviceIds = iThingsBoardDomain.getGatewayChildDevices(gatewayInfo.getId()); + log.info("网关 {} 有 {} 个子设备", gatewayInfo.getName(), childDeviceIds.size()); + + // 遍历该网关的子设备 + for (String childDeviceId : childDeviceIds) { + // 通过API获取子设备信息 + DeviceInfo childDeviceInfo = iThingsBoardDomain.getDeviceInfo(childDeviceId); + if (childDeviceInfo == null) { + log.warn("子设备 {} 不存在,跳过", childDeviceId); + skippedCount++; + continue; + } + + // 按照名字过滤子设备 + if (shouldExcludeDevice(childDeviceInfo.getName())) { + log.info("子设备 {} 匹配过滤规则,跳过同步", childDeviceInfo.getName()); + skippedCount++; + continue; + } + + try { + // 获取子设备属性 + AttributeMap attributes = iThingsBoardDomain.getDeviceAttributes(childDeviceId); + + // 判断设备类型 + DeviceType deviceType = determineDeviceType(childDeviceInfo, attributes); + + // 同步子设备(传入网关ID) + Long deviceId = syncDevice(childDeviceInfo, deviceType, gatewayInfo.getId()); + + // 根据设备类型进行不同的处理 + syncDeviceByType(deviceId, childDeviceInfo.getName(), deviceType, attributes); + + totalCount++; + + } catch (Exception e) { + log.error("同步网关子设备失败: gatewayName={}, childDeviceId={}, error={}", + gatewayInfo.getName(), childDeviceId, e.getMessage(), e); + } + } + } catch (Exception e) { - log.error("同步设备失败: deviceId={}, deviceName={}, error={}", - deviceInfo.getId(), deviceInfo.getName(), e.getMessage(), e); + log.error("同步网关失败: gatewayName={}, error={}", + gatewayInfo.getName(), e.getMessage(), e); } } } - log.info("========== 数据同步任务完成,共同步 {} 个设备 ==========", totalCount); + log.info("========== 数据同步任务完成,共同步 {} 个设备,跳过 {} 个设备 ==========", totalCount, skippedCount); } catch (Exception e) { log.error("数据同步任务执行失败: {}", e.getMessage(), e); } } + /** + * 根据设备类型进行不同的同步处理 + * + * @param deviceId 设备主键ID + * @param deviceName 设备名称 + * @param deviceType 设备类型 + * @param attributes 设备属性 + */ + private void syncDeviceByType(Long deviceId, String deviceName, DeviceType deviceType, AttributeMap attributes) { + if (deviceType == DeviceType.DOCK) { + // 机场:同步机场表 + syncDock(deviceId, deviceName); + + // 获取机场挂载的无人机SN号 + Optional subDeviceSnOpt = attributes.get(DeviceAttributes.SUB_DEVICE_SN); + if (subDeviceSnOpt.isPresent() && StringUtils.hasText(subDeviceSnOpt.get())) { + String aircraftSn = subDeviceSnOpt.get(); + Device aircraftDevice = findDeviceBySn(aircraftSn); + if (aircraftDevice != null) { + syncDockAircraft(deviceId, aircraftDevice.getDeviceId()); + } + } + } else if (deviceType == DeviceType.AIRCRAFT) { + // 无人机:同步无人机表 + syncAircraft(deviceId, deviceName); + } + // 网关类型不需要额外处理 + } + /** * 判断设备类型 * 优化后的判断逻辑: @@ -225,17 +324,13 @@ public class SynService { * 同步设备数据 * * @param deviceInfo ThingsBoard设备信息 - * @param attributes 设备属性 * @param deviceType 设备类型 + * @param gatewayId 网关设备ID(从映射中获取,避免重复API调用) * @return 设备主键ID */ - private Long syncDevice(DeviceInfo deviceInfo, AttributeMap attributes, DeviceType deviceType) { + private Long syncDevice(DeviceInfo deviceInfo, DeviceType deviceType, String gatewayId) { String iotDeviceId = deviceInfo.getId(); String deviceName = deviceInfo.getName(); - - // 使用 ThingsBoard 标准方式获取网关设备ID(通过 EntityRelation) - String gateway = iThingsBoardDomain.getDeviceGatewayId(iotDeviceId); - String deviceSn = deviceName; // 使用设备名称作为SN号,网关其实是没有SN号的 // 查询设备是否已存在 @@ -248,7 +343,7 @@ public class SynService { newDevice.setIotDeviceId(iotDeviceId); newDevice.setDeviceType(deviceType.getCode()); newDevice.setDeviceSn(deviceSn); - newDevice.setGateway(gateway); + newDevice.setGateway(gatewayId); newDevice.setCreateBy("system"); deviceDomain.insertDevice(newDevice); @@ -270,8 +365,8 @@ public class SynService { existingDevice.setDeviceSn(deviceSn); needUpdate = true; } - if (!Objects.equals(existingDevice.getGateway(), gateway)) { - existingDevice.setGateway(gateway); + if (!Objects.equals(existingDevice.getGateway(), gatewayId)) { + existingDevice.setGateway(gatewayId); needUpdate = true; }