# ThingsBoard 数据存储源码分析 ## 1. 概述 ThingsBoard 的数据存储层采用分层设计,支持多种数据库存储不同类型的任务。实体数据存储在关系型数据库(PostgreSQL),时序数据可以存储在 PostgreSQL/TimescaleDB 或 Cassandra。 ## 2. 数据存储架构 ### 2.1 存储分层 ``` 应用层 ├── TelemetryService (遥测服务) ├── AttributeService (属性服务) └── EntityService (实体服务) ↓ 数据访问层 (DAO) ├── TimeseriesDao (时序数据) ├── AttributeDao (属性数据) └── EntityDao (实体数据) ↓ 数据库层 ├── PostgreSQL (实体数据 + 时序数据) ├── TimescaleDB (时序数据扩展) └── Cassandra (时序数据) ``` ### 2.2 数据分类 1. **实体数据**: 租户、用户、设备、资产等 - 存储位置: PostgreSQL - 特点: 结构化数据,需要事务支持 2. **时序数据**: 设备遥测数据 - 存储位置: PostgreSQL/TimescaleDB 或 Cassandra - 特点: 时间序列数据,高写入量 3. **属性数据**: 设备属性(服务器端、客户端、共享) - 存储位置: PostgreSQL - 特点: 键值对数据 4. **最新值数据**: 时序数据的最新值 - 存储位置: PostgreSQL - 特点: 快速查询最新值 ## 3. 时序数据存储 ### 3.1 时序数据服务 **位置**: `application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java` **关键方法**: ```java /** * 保存时序数据 */ @Override public void saveTimeseries(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); // 检查是否为内部实体 checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; // 检查是否启用数据库存储 if (sysTenant || !request.getStrategy().saveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { // 验证数据 KvUtils.validate(request.getEntries(), valueNoXssValidation); // 保存时序数据 ListenableFuture future = saveTimeseriesInternal(request); if (request.getStrategy().saveTimeseries()) { Futures.addCallback(future, getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant), tsCallBackExecutor); } } else { request.getCallback().onFailure( new RuntimeException("DB storage writes are disabled due to API limits!")); } } ``` ### 3.2 时序数据 DAO **位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java` **关键方法**: ```java /** * 保存时序数据 */ private ListenableFuture doSave( TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest, boolean saveTs) { // 1. 准备分区保存任务 List> tsFutures = saveTs ? new ArrayList<>(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST) : null; // 2. 准备最新值保存任务 List> latestFutures = saveLatest ? new ArrayList<>(tsKvEntries.size()) : null; // 3. 为每个数据点创建保存任务 for (TsKvEntry tsKvEntry : tsKvEntries) { if (saveTs) { // 保存分区信息 tsFutures.add(timeseriesDao.savePartition( tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); // 保存时序数据 tsFutures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); } if (saveLatest) { // 保存最新值 latestFutures.add(Futures.transform( timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), version -> { if (version != null) { // 通知 EDQS 服务更新 TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; edqsService.onUpdate(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version)); } return version; }, MoreExecutors.directExecutor())); } } // 4. 合并所有 Future ListenableFuture dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0); // 5. 返回结果 return Futures.transform( Futures.allAsList(Arrays.asList(dpsFuture, versionsFuture)), results -> new TimeseriesSaveResult(...), MoreExecutors.directExecutor()); } ``` ### 3.3 SQL 时序数据存储 **位置**: `dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlInsertTsRepository.java` PostgreSQL 时序数据存储实现: ```java /** * 保存时序数据到 PostgreSQL */ @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { // 1. 构建 SQL 插入语句 String partition = getPartition(tsKvEntry.getTs()); String tableName = getTableName(partition); // 2. 执行插入 return executeAsync(() -> { return jdbcTemplate.update( "INSERT INTO " + tableName + " (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", entityId.getId(), tsKvEntry.getKey(), tsKvEntry.getTs(), getBoolValue(tsKvEntry), getStrValue(tsKvEntry), getLongValue(tsKvEntry), getDblValue(tsKvEntry), getJsonValue(tsKvEntry) ); }); } ``` ### 3.4 TimescaleDB 时序数据存储 **位置**: `dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java` TimescaleDB 是 PostgreSQL 的时序数据扩展,提供更好的性能: ```java /** * 保存时序数据到 TimescaleDB * TimescaleDB 提供了 Hypertable 特性,自动管理分区 */ @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { // TimescaleDB 使用 Hypertable,不需要手动管理分区 return executeAsync(() -> { return jdbcTemplate.update( "INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", entityId.getId(), tsKvEntry.getKey(), tsKvEntry.getTs(), getBoolValue(tsKvEntry), getStrValue(tsKvEntry), getLongValue(tsKvEntry), getDblValue(tsKvEntry), getJsonValue(tsKvEntry) ); }); } ``` ### 3.5 Cassandra 时序数据存储 **位置**: `dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java` Cassandra 提供高写入性能,适合大规模时序数据: ```java /** * 批量保存时序数据到 Cassandra */ public ListenableFuture save(TenantId tenantId, EntityId entityId, List entries, long ttl) { // 1. 构建 Cassandra 插入语句 List statements = new ArrayList<>(); for (TsKvEntry entry : entries) { // 2. 构建分区键(entity_id + key + partition) String partition = getPartition(entry.getTs()); // 3. 构建插入语句 Insert insert = QueryBuilder.insertInto("ts_kv_cf") .value("entity_id", entityId.getId()) .value("key", entry.getKey()) .value("partition", partition) .value("ts", entry.getTs()) .value("bool_v", getBoolValue(entry)) .value("str_v", getStrValue(entry)) .value("long_v", getLongValue(entry)) .value("dbl_v", getDblValue(entry)) .value("json_v", getJsonValue(entry)); if (ttl > 0) { insert.using(QueryBuilder.ttl((int) ttl)); } statements.add(insert); } // 4. 批量执行 return executeAsync(() -> { session.execute(BatchStatement.newInstance(BatchStatement.Type.UNLOGGED, statements)); return null; }); } ``` ## 4. 数据分区策略 ### 4.1 SQL 分区 PostgreSQL 使用表分区来管理时序数据: ```java /** * 获取分区名称 * 分区策略:按月分区 */ private String getPartition(long ts) { Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); calendar.setTimeInMillis(ts); return String.format("ts_kv_%d_%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1); } ``` ### 4.2 Cassandra 分区 Cassandra 使用分区键来分布数据: ```java /** * 获取 Cassandra 分区 * 分区策略:按天分区 */ private String getPartition(long ts) { return TimeUUIDs.timeUuid(ts).toString(); } ``` ## 5. 最新值存储 ### 5.1 最新值 DAO **位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java` 最新值存储在独立的表中,便于快速查询: ```java /** * 保存最新值 */ @Override public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { // 1. 构建 SQL UPDATE 语句(使用 ON CONFLICT 处理冲突) String sql = "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?) " + "ON CONFLICT (entity_id, key) DO UPDATE SET " + "ts = EXCLUDED.ts, " + "bool_v = EXCLUDED.bool_v, " + "str_v = EXCLUDED.str_v, " + "long_v = EXCLUDED.long_v, " + "dbl_v = EXCLUDED.dbl_v, " + "json_v = EXCLUDED.json_v"; // 2. 执行更新 return executeAsync(() -> { return jdbcTemplate.update(sql, ...); }); } ``` ## 6. 属性数据存储 ### 6.1 属性服务 **位置**: `application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java` 属性数据包括: - **SERVER_SCOPE**: 服务器端属性 - **SHARED_SCOPE**: 共享属性 - **CLIENT_SCOPE**: 客户端属性 ```java /** * 保存属性数据 */ public void saveAttributes(AttributesSaveRequest request) { // 1. 验证权限 // 2. 保存属性 // 3. 发送更新通知 } ``` ## 7. 数据查询 ### 7.1 时序数据查询 **位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java` ```java /** * 查询时序数据 */ public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, List queries) { // 1. 构建查询任务列表 List>> futures = new ArrayList<>(); for (ReadTsKvQuery query : queries) { // 2. 根据查询类型选择查询方法 if (query.getAggregation() == Aggregation.NONE) { futures.add(timeseriesDao.findAll(tenantId, entityId, query)); } else { futures.add(timeseriesDao.findAggregate(tenantId, entityId, query)); } } // 3. 合并结果 return Futures.transform(Futures.allAsList(futures), results -> results.stream().flatMap(List::stream).collect(Collectors.toList()), MoreExecutors.directExecutor()); } ``` ### 7.2 聚合查询 支持以下聚合函数: - **AVG**: 平均值 - **SUM**: 求和 - **MIN**: 最小值 - **MAX**: 最大值 - **COUNT**: 计数 ## 8. 数据 TTL (Time To Live) ### 8.1 TTL 配置 数据可以配置 TTL,自动过期删除: ```java /** * 保存带 TTL 的数据 */ public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { if (ttl > 0) { // PostgreSQL: 使用 DELETE 触发器或定期清理 // Cassandra: 使用 TTL 特性 insert.using(QueryBuilder.ttl((int) ttl)); } } ``` ## 9. 性能优化 ### 9.1 批量插入 时序数据使用批量插入提高性能: ```java /** * 批量保存 */ public ListenableFuture saveBatch(List entries) { // 使用批量插入语句 BatchStatement batch = BatchStatement.newInstance(...); for (TsKvEntry entry : entries) { batch.add(buildInsert(entry)); } return executeAsync(() -> session.execute(batch)); } ``` ### 9.2 异步处理 所有数据库操作都是异步的,使用 ListenableFuture: ```java /** * 异步执行数据库操作 */ private ListenableFuture executeAsync(Callable callable) { return executorService.submit(callable); } ``` ## 10. 总结 ThingsBoard 的数据存储系统具有以下特点: 1. **分层设计**: 清晰的 DAO 层抽象,支持多种数据库 2. **多数据库支持**: PostgreSQL、TimescaleDB、Cassandra 3. **分区策略**: SQL 按月分区,Cassandra 按天分区 4. **最新值优化**: 独立的最新值表,快速查询 5. **异步处理**: 所有操作异步执行,提高性能 6. **TTL 支持**: 自动数据过期清理 7. **批量操作**: 支持批量插入,提高写入性能 这套数据存储系统能够处理大规模的物联网时序数据,同时保持良好的查询性能。