thingsboard/summary/03-核心模块源码分析/03-数据存储分析.md

466 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ThingsBoard 数据存储源码分析
## 1. 概述
ThingsBoard 的数据存储层采用分层设计支持多种数据库存储不同类型的任务。实体数据存储在关系型数据库PostgreSQL时序数据可以存储在 PostgreSQL/TimescaleDB 或 Cassandra。
## 2. 数据存储架构
### 2.1 存储分层
```
应用层
├── TelemetryService (遥测服务)
├── AttributeService (属性服务)
└── EntityService (实体服务)
数据访问层 (DAO)
├── TimeseriesDao (时序数据)
├── AttributeDao (属性数据)
└── EntityDao (实体数据)
数据库层
├── PostgreSQL (实体数据 + 时序数据)
├── TimescaleDB (时序数据扩展)
└── Cassandra (时序数据)
```
### 2.2 数据分类
1. **实体数据**: 租户、用户、设备、资产等
- 存储位置: PostgreSQL
- 特点: 结构化数据,需要事务支持
2. **时序数据**: 设备遥测数据
- 存储位置: PostgreSQL/TimescaleDB 或 Cassandra
- 特点: 时间序列数据,高写入量
3. **属性数据**: 设备属性(服务器端、客户端、共享)
- 存储位置: PostgreSQL
- 特点: 键值对数据
4. **最新值数据**: 时序数据的最新值
- 存储位置: PostgreSQL
- 特点: 快速查询最新值
## 3. 时序数据存储
### 3.1 时序数据服务
**位置**: `application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java`
**关键方法**:
```java
/**
* 保存时序数据
*/
@Override
public void saveTimeseries(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
// 检查是否为内部实体
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
// 检查是否启用数据库存储
if (sysTenant || !request.getStrategy().saveTimeseries() ||
apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
// 验证数据
KvUtils.validate(request.getEntries(), valueNoXssValidation);
// 保存时序数据
ListenableFuture<TimeseriesSaveResult> future = saveTimeseriesInternal(request);
if (request.getStrategy().saveTimeseries()) {
Futures.addCallback(future, getApiUsageCallback(tenantId,
request.getCustomerId(), sysTenant), tsCallBackExecutor);
}
} else {
request.getCallback().onFailure(
new RuntimeException("DB storage writes are disabled due to API limits!"));
}
}
```
### 3.2 时序数据 DAO
**位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java`
**关键方法**:
```java
/**
* 保存时序数据
*/
private ListenableFuture<TimeseriesSaveResult> doSave(
TenantId tenantId,
EntityId entityId,
List<TsKvEntry> tsKvEntries,
long ttl,
boolean saveLatest,
boolean saveTs) {
// 1. 准备分区保存任务
List<ListenableFuture<Integer>> tsFutures = saveTs ?
new ArrayList<>(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST) : null;
// 2. 准备最新值保存任务
List<ListenableFuture<Long>> latestFutures = saveLatest ?
new ArrayList<>(tsKvEntries.size()) : null;
// 3. 为每个数据点创建保存任务
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (saveTs) {
// 保存分区信息
tsFutures.add(timeseriesDao.savePartition(
tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
// 保存时序数据
tsFutures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
}
if (saveLatest) {
// 保存最新值
latestFutures.add(Futures.transform(
timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry),
version -> {
if (version != null) {
// 通知 EDQS 服务更新
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ?
(TenantId) entityId : tenantId;
edqsService.onUpdate(edqsTenantId, ObjectType.LATEST_TS_KV,
new LatestTsKv(entityId, tsKvEntry, version));
}
return version;
}, MoreExecutors.directExecutor()));
}
}
// 4. 合并所有 Future
ListenableFuture<Integer> dpsFuture = saveTs ?
Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS,
MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
// 5. 返回结果
return Futures.transform(
Futures.allAsList(Arrays.asList(dpsFuture, versionsFuture)),
results -> new TimeseriesSaveResult(...),
MoreExecutors.directExecutor());
}
```
### 3.3 SQL 时序数据存储
**位置**: `dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlInsertTsRepository.java`
PostgreSQL 时序数据存储实现:
```java
/**
* 保存时序数据到 PostgreSQL
*/
@Override
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId,
TsKvEntry tsKvEntry, long ttl) {
// 1. 构建 SQL 插入语句
String partition = getPartition(tsKvEntry.getTs());
String tableName = getTableName(partition);
// 2. 执行插入
return executeAsync(() -> {
return jdbcTemplate.update(
"INSERT INTO " + tableName + " (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
entityId.getId(),
tsKvEntry.getKey(),
tsKvEntry.getTs(),
getBoolValue(tsKvEntry),
getStrValue(tsKvEntry),
getLongValue(tsKvEntry),
getDblValue(tsKvEntry),
getJsonValue(tsKvEntry)
);
});
}
```
### 3.4 TimescaleDB 时序数据存储
**位置**: `dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/timescale/TimescaleInsertTsRepository.java`
TimescaleDB 是 PostgreSQL 的时序数据扩展,提供更好的性能:
```java
/**
* 保存时序数据到 TimescaleDB
* TimescaleDB 提供了 Hypertable 特性,自动管理分区
*/
@Override
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId,
TsKvEntry tsKvEntry, long ttl) {
// TimescaleDB 使用 Hypertable不需要手动管理分区
return executeAsync(() -> {
return jdbcTemplate.update(
"INSERT INTO ts_kv (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
entityId.getId(),
tsKvEntry.getKey(),
tsKvEntry.getTs(),
getBoolValue(tsKvEntry),
getStrValue(tsKvEntry),
getLongValue(tsKvEntry),
getDblValue(tsKvEntry),
getJsonValue(tsKvEntry)
);
});
}
```
### 3.5 Cassandra 时序数据存储
**位置**: `dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java`
Cassandra 提供高写入性能,适合大规模时序数据:
```java
/**
* 批量保存时序数据到 Cassandra
*/
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId,
List<TsKvEntry> entries, long ttl) {
// 1. 构建 Cassandra 插入语句
List<Statement> statements = new ArrayList<>();
for (TsKvEntry entry : entries) {
// 2. 构建分区键entity_id + key + partition
String partition = getPartition(entry.getTs());
// 3. 构建插入语句
Insert insert = QueryBuilder.insertInto("ts_kv_cf")
.value("entity_id", entityId.getId())
.value("key", entry.getKey())
.value("partition", partition)
.value("ts", entry.getTs())
.value("bool_v", getBoolValue(entry))
.value("str_v", getStrValue(entry))
.value("long_v", getLongValue(entry))
.value("dbl_v", getDblValue(entry))
.value("json_v", getJsonValue(entry));
if (ttl > 0) {
insert.using(QueryBuilder.ttl((int) ttl));
}
statements.add(insert);
}
// 4. 批量执行
return executeAsync(() -> {
session.execute(BatchStatement.newInstance(BatchStatement.Type.UNLOGGED, statements));
return null;
});
}
```
## 4. 数据分区策略
### 4.1 SQL 分区
PostgreSQL 使用表分区来管理时序数据:
```java
/**
* 获取分区名称
* 分区策略:按月分区
*/
private String getPartition(long ts) {
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
calendar.setTimeInMillis(ts);
return String.format("ts_kv_%d_%02d",
calendar.get(Calendar.YEAR),
calendar.get(Calendar.MONTH) + 1);
}
```
### 4.2 Cassandra 分区
Cassandra 使用分区键来分布数据:
```java
/**
* 获取 Cassandra 分区
* 分区策略:按天分区
*/
private String getPartition(long ts) {
return TimeUUIDs.timeUuid(ts).toString();
}
```
## 5. 最新值存储
### 5.1 最新值 DAO
**位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java`
最新值存储在独立的表中,便于快速查询:
```java
/**
* 保存最新值
*/
@Override
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId,
TsKvEntry tsKvEntry) {
// 1. 构建 SQL UPDATE 语句(使用 ON CONFLICT 处理冲突)
String sql = "INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (entity_id, key) DO UPDATE SET " +
"ts = EXCLUDED.ts, " +
"bool_v = EXCLUDED.bool_v, " +
"str_v = EXCLUDED.str_v, " +
"long_v = EXCLUDED.long_v, " +
"dbl_v = EXCLUDED.dbl_v, " +
"json_v = EXCLUDED.json_v";
// 2. 执行更新
return executeAsync(() -> {
return jdbcTemplate.update(sql, ...);
});
}
```
## 6. 属性数据存储
### 6.1 属性服务
**位置**: `application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java`
属性数据包括:
- **SERVER_SCOPE**: 服务器端属性
- **SHARED_SCOPE**: 共享属性
- **CLIENT_SCOPE**: 客户端属性
```java
/**
* 保存属性数据
*/
public void saveAttributes(AttributesSaveRequest request) {
// 1. 验证权限
// 2. 保存属性
// 3. 发送更新通知
}
```
## 7. 数据查询
### 7.1 时序数据查询
**位置**: `dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java`
```java
/**
* 查询时序数据
*/
public ListenableFuture<List<TsKvEntry>> findAll(TenantId tenantId,
EntityId entityId,
List<ReadTsKvQuery> queries) {
// 1. 构建查询任务列表
List<ListenableFuture<List<TsKvEntry>>> futures = new ArrayList<>();
for (ReadTsKvQuery query : queries) {
// 2. 根据查询类型选择查询方法
if (query.getAggregation() == Aggregation.NONE) {
futures.add(timeseriesDao.findAll(tenantId, entityId, query));
} else {
futures.add(timeseriesDao.findAggregate(tenantId, entityId, query));
}
}
// 3. 合并结果
return Futures.transform(Futures.allAsList(futures),
results -> results.stream().flatMap(List::stream).collect(Collectors.toList()),
MoreExecutors.directExecutor());
}
```
### 7.2 聚合查询
支持以下聚合函数:
- **AVG**: 平均值
- **SUM**: 求和
- **MIN**: 最小值
- **MAX**: 最大值
- **COUNT**: 计数
## 8. 数据 TTL (Time To Live)
### 8.1 TTL 配置
数据可以配置 TTL自动过期删除
```java
/**
* 保存带 TTL 的数据
*/
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId,
TsKvEntry tsKvEntry, long ttl) {
if (ttl > 0) {
// PostgreSQL: 使用 DELETE 触发器或定期清理
// Cassandra: 使用 TTL 特性
insert.using(QueryBuilder.ttl((int) ttl));
}
}
```
## 9. 性能优化
### 9.1 批量插入
时序数据使用批量插入提高性能:
```java
/**
* 批量保存
*/
public ListenableFuture<Void> saveBatch(List<TsKvEntry> entries) {
// 使用批量插入语句
BatchStatement batch = BatchStatement.newInstance(...);
for (TsKvEntry entry : entries) {
batch.add(buildInsert(entry));
}
return executeAsync(() -> session.execute(batch));
}
```
### 9.2 异步处理
所有数据库操作都是异步的,使用 ListenableFuture
```java
/**
* 异步执行数据库操作
*/
private <T> ListenableFuture<T> executeAsync(Callable<T> callable) {
return executorService.submit(callable);
}
```
## 10. 总结
ThingsBoard 的数据存储系统具有以下特点:
1. **分层设计**: 清晰的 DAO 层抽象,支持多种数据库
2. **多数据库支持**: PostgreSQL、TimescaleDB、Cassandra
3. **分区策略**: SQL 按月分区Cassandra 按天分区
4. **最新值优化**: 独立的最新值表,快速查询
5. **异步处理**: 所有操作异步执行,提高性能
6. **TTL 支持**: 自动数据过期清理
7. **批量操作**: 支持批量插入,提高写入性能
这套数据存储系统能够处理大规模的物联网时序数据,同时保持良好的查询性能。