353 lines
9.9 KiB
Markdown
353 lines
9.9 KiB
Markdown
# ThingsBoard 设备认证与会话管理源码分析
|
||
|
||
## 1. 概述
|
||
|
||
ThingsBoard 支持多种设备认证方式和会话管理机制。设备通过传输层(MQTT、CoAP、HTTP等)连接后,需要进行认证才能建立会话。
|
||
|
||
## 2. 设备认证
|
||
|
||
### 2.1 认证流程
|
||
|
||
```
|
||
设备连接 -> 传输层接收 -> 验证设备凭证 -> 创建会话 -> 发送到设备 Actor
|
||
```
|
||
|
||
### 2.2 设备认证服务
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/service/security/device/DefaultDeviceAuthService.java`
|
||
|
||
**关键方法**:
|
||
|
||
```java
|
||
/**
|
||
* 处理设备认证请求
|
||
* @param credentialsFilter 设备凭证过滤器
|
||
* @return 认证结果
|
||
*/
|
||
@Override
|
||
public DeviceAuthResult process(DeviceCredentialsFilter credentialsFilter) {
|
||
log.trace("Lookup device credentials using filter {}", credentialsFilter);
|
||
|
||
// 1. 根据凭证ID查找设备凭证
|
||
DeviceCredentials credentials = deviceCredentialsService
|
||
.findDeviceCredentialsByCredentialsId(credentialsFilter.getCredentialsId());
|
||
|
||
if (credentials != null) {
|
||
log.trace("Credentials found {}", credentials);
|
||
|
||
// 2. 验证凭证类型是否匹配
|
||
if (credentials.getCredentialsType() == credentialsFilter.getCredentialsType()) {
|
||
switch (credentials.getCredentialsType()) {
|
||
case ACCESS_TOKEN:
|
||
// Access Token 认证:凭证ID直接匹配凭证值
|
||
return DeviceAuthResult.of(credentials.getDeviceId());
|
||
case X509_CERTIFICATE:
|
||
// X509 证书认证
|
||
return DeviceAuthResult.of(credentials.getDeviceId());
|
||
case LWM2M_CREDENTIALS:
|
||
// LWM2M 凭证认证
|
||
return DeviceAuthResult.of(credentials.getDeviceId());
|
||
default:
|
||
return DeviceAuthResult.of("Credentials Type is not supported yet!");
|
||
}
|
||
} else {
|
||
return DeviceAuthResult.of("Credentials Type mismatch!");
|
||
}
|
||
} else {
|
||
log.trace("Credentials not found!");
|
||
return DeviceAuthResult.of("Credentials Not Found!");
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2.3 支持的认证类型
|
||
|
||
ThingsBoard 支持以下设备认证类型:
|
||
|
||
1. **ACCESS_TOKEN**: Access Token 认证(最常用)
|
||
- 设备使用预配置的 Access Token 进行认证
|
||
- 凭证ID 直接匹配 Access Token
|
||
|
||
2. **X509_CERTIFICATE**: X509 证书认证
|
||
- 使用 TLS/SSL 证书进行认证
|
||
- 需要配置证书和私钥
|
||
|
||
3. **LWM2M_CREDENTIALS**: LWM2M 凭证认证
|
||
- 用于 LWM2M 协议设备
|
||
- 支持多种凭证类型(PSK、RPK、X509)
|
||
|
||
4. **MQTT_BASIC**: MQTT Basic 认证(已废弃)
|
||
|
||
### 2.4 传输层认证集成
|
||
|
||
不同传输协议集成认证的方式:
|
||
|
||
#### MQTT 传输
|
||
|
||
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`
|
||
|
||
MQTT 认证流程:
|
||
1. 设备连接时提供客户端ID和用户名/密码
|
||
2. 传输层调用认证服务验证凭证
|
||
3. 认证成功后建立会话
|
||
|
||
#### HTTP 传输
|
||
|
||
**位置**: `common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java`
|
||
|
||
HTTP 认证流程:
|
||
1. 设备在请求头或URL参数中提供 Access Token
|
||
2. 传输层验证 Token
|
||
3. 认证成功后处理请求
|
||
|
||
#### CoAP 传输
|
||
|
||
**位置**: `common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java`
|
||
|
||
CoAP 认证流程:
|
||
1. 设备在 CoAP 请求中提供凭证
|
||
2. 传输层验证凭证
|
||
3. 认证成功后建立会话
|
||
|
||
## 3. 会话管理
|
||
|
||
### 3.1 会话信息
|
||
|
||
**位置**: `common/proto/src/main/proto/transport.proto`
|
||
|
||
会话信息通过 `SessionInfoProto` 定义:
|
||
|
||
```protobuf
|
||
message SessionInfoProto {
|
||
int64 nodeIdMSB = 1;
|
||
int64 nodeIdLSB = 2;
|
||
int64 sessionIdMSB = 3;
|
||
int64 sessionIdLSB = 4;
|
||
int64 deviceIdMSB = 5;
|
||
int64 deviceIdLSB = 6;
|
||
string deviceName = 7;
|
||
string deviceType = 8;
|
||
int64 deviceProfileIdMSB = 9;
|
||
int64 deviceProfileIdLSB = 10;
|
||
int64 tenantIdMSB = 11;
|
||
int64 tenantIdLSB = 12;
|
||
int64 customerIdMSB = 13;
|
||
int64 customerIdLSB = 14;
|
||
SessionType sessionType = 15;
|
||
// ...
|
||
}
|
||
```
|
||
|
||
### 3.2 会话类型
|
||
|
||
ThingsBoard 支持两种会话类型:
|
||
|
||
1. **SYNC**: 同步会话
|
||
- 设备需要等待响应
|
||
- 用于请求-响应模式
|
||
|
||
2. **ASYNC**: 异步会话
|
||
- 设备不需要等待响应
|
||
- 用于遥测数据上报等场景
|
||
|
||
### 3.3 会话生命周期
|
||
|
||
#### 3.3.1 会话创建
|
||
|
||
**位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`
|
||
|
||
```java
|
||
/**
|
||
* 注册会话
|
||
*/
|
||
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
|
||
// 1. 创建会话上下文
|
||
SessionContext sessionCtx = new SessionContext(sessionInfo, listener);
|
||
|
||
// 2. 注册会话
|
||
sessions.put(sessionId, sessionCtx);
|
||
|
||
// 3. 发送会话打开事件
|
||
sendToCore(tenantId, deviceId,
|
||
ToCoreMsg.newBuilder()
|
||
.setDeviceConnectMsg(DeviceConnectProto.newBuilder()
|
||
.setSessionInfo(sessionInfo)
|
||
.build())
|
||
.build(),
|
||
routingKey, null);
|
||
}
|
||
```
|
||
|
||
#### 3.3.2 会话管理
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java`
|
||
|
||
设备 Actor 管理设备的所有会话:
|
||
|
||
```java
|
||
/**
|
||
* 处理传输层消息
|
||
* 更新会话信息
|
||
*/
|
||
public void process(TransportToDeviceActorMsgWrapper wrapper) {
|
||
TransportToDeviceActorMsg msg = wrapper.getMsg();
|
||
SessionInfoProto sessionInfo = msg.getSessionInfo();
|
||
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(),
|
||
sessionInfo.getSessionIdLSB());
|
||
|
||
// 更新会话活动时间
|
||
updateSessionActivity(sessionId, sessionInfo);
|
||
|
||
// 处理消息
|
||
processMessage(msg);
|
||
}
|
||
```
|
||
|
||
#### 3.3.3 会话超时
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java`
|
||
|
||
```java
|
||
/**
|
||
* 检查会话超时
|
||
*/
|
||
public void checkSessionsTimeout() {
|
||
long currentTime = System.currentTimeMillis();
|
||
List<UUID> expiredSessions = new ArrayList<>();
|
||
|
||
for (Map.Entry<UUID, SessionInfo> entry : sessions.entrySet()) {
|
||
SessionInfo sessionInfo = entry.getValue();
|
||
long timeout = sessionInfo.getTimeout();
|
||
|
||
if (currentTime - sessionInfo.getLastActivityTime() > timeout) {
|
||
expiredSessions.add(entry.getKey());
|
||
}
|
||
}
|
||
|
||
// 关闭超时会话
|
||
for (UUID sessionId : expiredSessions) {
|
||
closeSession(sessionId);
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 3.3.4 会话关闭
|
||
|
||
```java
|
||
/**
|
||
* 关闭会话
|
||
*/
|
||
public void closeSession(UUID sessionId) {
|
||
SessionInfo sessionInfo = sessions.remove(sessionId);
|
||
if (sessionInfo != null) {
|
||
// 发送设备断开消息
|
||
sendToCore(tenantId, deviceId,
|
||
ToCoreMsg.newBuilder()
|
||
.setDeviceDisconnectMsg(DeviceDisconnectProto.newBuilder()
|
||
.setSessionInfo(sessionInfo.getSessionInfo())
|
||
.build())
|
||
.build(),
|
||
routingKey, null);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 4. 设备状态管理
|
||
|
||
### 4.1 设备连接状态
|
||
|
||
**位置**: `application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java`
|
||
|
||
设备状态服务跟踪设备的连接状态:
|
||
|
||
```java
|
||
/**
|
||
* 处理设备连接事件
|
||
*/
|
||
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId,
|
||
SessionInfoProto sessionInfo) {
|
||
// 更新设备连接状态
|
||
save(tenantId, deviceId, "active", true);
|
||
save(tenantId, deviceId, "lastConnectTime", System.currentTimeMillis());
|
||
}
|
||
```
|
||
|
||
### 4.2 设备活动跟踪
|
||
|
||
设备活动包括:
|
||
- 连接/断开事件
|
||
- 遥测数据上报
|
||
- 属性更新
|
||
- RPC 调用
|
||
|
||
## 5. 安全机制
|
||
|
||
### 5.1 凭证存储
|
||
|
||
设备凭证存储在数据库中,支持加密存储:
|
||
|
||
**位置**: `dao/src/main/java/org/thingsboard/server/dao/device/DeviceCredentialsDao.java`
|
||
|
||
### 5.2 访问控制
|
||
|
||
- 设备只能访问自己的数据
|
||
- 租户隔离:不同租户的设备完全隔离
|
||
- 权限验证:所有操作都需要验证权限
|
||
|
||
### 5.3 速率限制
|
||
|
||
**位置**: `common/transport/src/main/java/org/thingsboard/server/common/transport/limits/TransportRateLimitService.java`
|
||
|
||
传输层支持速率限制:
|
||
- 每个设备的消息速率限制
|
||
- 每个租户的消息速率限制
|
||
- 防止恶意设备攻击
|
||
|
||
## 6. 网关设备认证
|
||
|
||
### 6.1 网关概念
|
||
|
||
网关设备可以代表多个子设备进行通信。
|
||
|
||
### 6.2 网关认证流程
|
||
|
||
1. 网关设备使用自己的凭证认证
|
||
2. 认证成功后,网关可以创建子设备会话
|
||
3. 子设备使用网关凭证 + 设备名称进行认证
|
||
|
||
**位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java`
|
||
|
||
```java
|
||
/**
|
||
* 处理网关设备认证
|
||
*/
|
||
protected void processGatewayConnect(String deviceName, MqttConnectMessage msg) {
|
||
// 1. 验证网关凭证
|
||
ValidateDeviceCredentialsResponse response = validateGatewayCredentials(msg);
|
||
|
||
if (response.hasDeviceInfo()) {
|
||
// 2. 获取或创建子设备
|
||
GetOrCreateDeviceFromGatewayResponse deviceResponse =
|
||
getOrCreateDevice(deviceName, response.getDeviceInfo());
|
||
|
||
if (deviceResponse.hasDeviceInfo()) {
|
||
// 3. 创建子设备会话
|
||
establishDeviceSession(deviceResponse.getDeviceInfo());
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 7. 总结
|
||
|
||
ThingsBoard 的设备认证和会话管理系统具有以下特点:
|
||
|
||
1. **多种认证方式**: 支持 Access Token、X509 证书、LWM2M 凭证等
|
||
2. **会话管理**: 完善的会话生命周期管理,支持同步和异步会话
|
||
3. **状态跟踪**: 跟踪设备连接状态和活动情况
|
||
4. **安全机制**: 凭证加密存储、访问控制、速率限制
|
||
5. **网关支持**: 支持网关设备代表子设备进行通信
|
||
|
||
这套机制确保了设备连接的安全性和可靠性,同时支持大规模的设备连接。
|
||
|