# tb-mqtt-transport 流量分析 ## 1. 镜像概述 **tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输服务,负责: - 接收设备通过 MQTT 协议发送的消息 - 设备认证和会话管理 - 消息格式转换(JSON、Protobuf等) - 将设备消息发送到核心服务(通过Kafka) - 接收核心服务消息并推送到设备 **镜像名称**: `thingsboard/tb-mqtt-transport` **源码位置**: - 主应用: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/` - 传输处理: `common/transport/mqtt/` **端口**: `1883` (MQTT) --- ## 2. 流量入口 ### 2.1 MQTT 协议入口 **入口点**: - `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java` - `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java` **端口**: `1883` (MQTT协议) **流量来源**: - **IoT设备**: 通过MQTT协议连接并发送数据 - **MQTT客户端**: 各种MQTT客户端库 - **网关设备**: MQTT网关转发设备数据 **MQTT消息类型**: - `CONNECT`: 设备连接请求 - `PUBLISH`: 设备发布消息(遥测、属性、RPC响应等) - `SUBSCRIBE`: 设备订阅主题(属性更新、RPC请求等) - `DISCONNECT`: 设备断开连接 ### 2.2 Kafka 消息入口(下行) **入口点**: - `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java` **Kafka主题**: `tb.transport.*` (通知主题) **流量来源**: - **tb-core**: 核心服务推送的消息(RPC请求、属性更新通知等) **消息类型**: - `ToTransportMsg`: 传输层消息 - `ToDeviceRpcRequestMsg`: RPC请求 - `AttributeUpdateNotificationMsg`: 属性更新通知 - `SessionCloseNotificationProto`: 会话关闭通知 --- ## 3. 流量处理流程 ### 3.1 设备连接流程(上行) ``` 设备 (MQTT CONNECT) ↓ MqttTransportServerInitializer (Netty Channel初始化) ↓ MqttTransportHandler.channelRegistered() ↓ 设备CONNECT消息 ↓ MqttTransportHandler.processConnect() ↓ 设备认证 ├─ 通过TransportApiRequestMsg查询设备 ├─ 验证设备凭证(AccessToken等) └─ 获取设备信息 ↓ 创建设备会话 (DeviceSessionCtx) ↓ 发送CONNACK确认 ↓ 订阅通知主题 (Kafka: tb.transport.*) ↓ 设备连接完成 ``` **核心代码模块**: 1. **MqttTransportService** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java`) - **功能**: MQTT传输服务启动和管理 - **关键方法**: - `init()`: 初始化Netty服务器,绑定MQTT端口 ```java @PostConstruct public void init() throws Exception { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(context, false)) .childOption(ChannelOption.SO_KEEPALIVE, keepAlive); serverChannel = b.bind(host, port).sync().channel(); } ``` 2. **MqttTransportHandler** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`) - **功能**: 处理MQTT消息的核心Handler - **关键方法**: - `channelRead()`: Netty Channel读取消息 - `processMqttMsg()`: 处理MQTT消息 - `processConnect()`: 处理设备连接 ### 3.2 设备数据上报流程(上行) ``` 设备 (MQTT PUBLISH) 主题: v1/devices/me/telemetry 或 v1/devices/me/attributes ↓ MqttTransportHandler.channelRead() ↓ MqttTransportHandler.processMqttMsg() ↓ processDevicePublish() (判断主题类型) ├─ 遥测数据 (isDeviceTelemetryTopic) │ └─ PostTelemetryMsg ├─ 属性数据 (isDeviceAttributesTopic) │ └─ PostAttributeMsg ├─ RPC响应 (DEVICE_RPC_RESPONSE_TOPIC) │ └─ ToDeviceRpcResponseMsg └─ 其他类型... ↓ MqttTransportAdaptor转换消息格式 ├─ JSON格式 → Protobuf ├─ Protobuf格式 → Protobuf └─ 自定义格式 → Protobuf ↓ DefaultTransportService.process() ↓ 构建ToCoreMsg ├─ TransportToDeviceActorMsg (设备消息) └─ 其他设备事件 ↓ 发送到Kafka (tb.core.* 主题) ↓ tb-core 接收并处理 ``` **核心代码模块**: 1. **MqttTransportHandler.processDevicePublish()** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java`) - **功能**: 处理设备发布的MQTT消息 - **关键逻辑**: ```java private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { // 判断主题类型 if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { // 遥测数据 TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...); } else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { // 属性数据 TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...); } // ... } ``` 2. **MqttTransportAdaptor** (`common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java`) - **功能**: MQTT消息格式转换 - **支持格式**: - JSON格式 - Protobuf格式 - 自定义格式 3. **DefaultTransportService.process()** (`common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java`) - **功能**: 传输服务统一处理接口 - **关键方法**: - `process(PostTelemetryMsg)`: 处理遥测数据 - `process(PostAttributeMsg)`: 处理属性数据 - `sendToCore()`: 发送消息到核心服务 ### 3.3 核心服务消息推送流程(下行) ``` tb-core 发送消息 ↓ Kafka (tb.transport.* 主题) ↓ DefaultTransportService.transportNotificationsConsumer ↓ 消费消息 (ToTransportMsg) ├─ ToDeviceRpcRequestMsg: RPC请求 ├─ AttributeUpdateNotificationMsg: 属性更新 └─ SessionCloseNotificationProto: 会话关闭 ↓ 根据nodeId路由到对应的传输服务实例 ↓ MqttTransportHandler处理 ├─ RPC请求 → 发布到设备订阅的RPC主题 ├─ 属性更新 → 发布到设备订阅的属性主题 └─ 会话关闭 → 关闭MQTT连接 ↓ 设备接收MQTT消息 ``` **核心代码模块**: 1. **DefaultTransportService.init()** - 初始化通知消费者 ```java @PostConstruct public void init() { // 创建通知消费者 transportNotificationsConsumer = QueueConsumerManager.>builder() .name("TB Transport Notifications") .msgPackProcessor(this::processNotifications) .pollInterval(notificationsPollDuration) .consumerCreator(() -> queueProvider.createTransportNotificationsConsumer()) .consumerExecutor(consumerExecutor) .threadPrefix("transport-notifications") .build(); } ``` 2. **processNotifications()** - 处理通知消息 ```java private void processNotifications(List> msgs, ...) { msgs.forEach(msg -> { ToTransportMsg toTransportMsg = msg.getValue(); // 根据消息类型处理 if (toTransportMsg.hasToDeviceRpcRequestMsg()) { // RPC请求 processRpcRequest(toTransportMsg.getToDeviceRpcRequestMsg()); } else if (toTransportMsg.hasAttributeUpdateNotificationMsg()) { // 属性更新 processAttributeUpdate(toTransportMsg.getAttributeUpdateNotificationMsg()); } }); } ``` 3. **MqttTransportHandler发送MQTT消息** - 将消息推送到设备 ```java private void sendRpcRequest(ToDeviceRpcRequestMsg rpcRequest) { // 获取设备订阅的RPC请求主题 String topic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; // 发布MQTT消息 publish(ctx, topic, rpcRequestPayload, MqttQoS.AT_LEAST_ONCE); } ``` ### 3.4 设备订阅流程 ``` 设备 (MQTT SUBSCRIBE) 主题: v1/devices/me/attributes 或 v1/devices/me/rpc/request/+ ↓ MqttTransportHandler.processSubscribe() ↓ 判断订阅主题类型 ├─ 属性订阅 → 订阅属性更新 ├─ RPC请求订阅 → 订阅RPC请求 └─ 其他订阅... ↓ 保存订阅信息到DeviceSessionCtx ↓ 发送SUBACK确认 ↓ 当有属性更新或RPC请求时,发布到对应主题 ``` --- ## 4. 流量出口 ### 4.1 Kafka消息出口(上行) **出口点**: `DefaultTransportService.sendToCore()` **Kafka主题**: `tb.core.*` **流量去向**: - **tb-core**: 核心服务处理设备数据 **消息类型**: - `ToCoreMsg`: 包含TransportToDeviceActorMsg - `PostTelemetryMsg`: 遥测数据 - `PostAttributeMsg`: 属性数据 - `ToDeviceRpcResponseMsg`: RPC响应 - `DeviceConnectProto`: 设备连接事件 - `DeviceDisconnectProto`: 设备断开事件 ### 4.2 MQTT消息出口(下行) **出口点**: `MqttTransportHandler.publish()` **MQTT主题**: - `v1/devices/{deviceId}/attributes`: 属性更新 - `v1/devices/{deviceId}/rpc/request/{requestId}`: RPC请求 - 其他设备订阅的主题 **流量去向**: - **IoT设备**: 设备接收MQTT消息 **消息类型**: - PUBLISH消息(属性更新、RPC请求等) --- ## 5. 核心代码模块详解 ### 5.1 ThingsboardMqttTransportApplication **位置**: `transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java` **功能**: - MQTT传输服务的Spring Boot应用入口 - 组件扫描配置 **关键代码**: ```java @SpringBootConfiguration @EnableAsync @EnableScheduling @ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue", "org.thingsboard.server.cache"}) public class ThingsboardMqttTransportApplication { public static void main(String[] args) { SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args)); } } ``` ### 5.2 MqttTransportService **位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java` **功能**: - 启动和管理MQTT服务器(基于Netty) - 处理SSL连接(可选) - 管理Netty EventLoopGroup **关键方法**: 1. **init()**: 初始化MQTT服务器 - 创建Netty ServerBootstrap - 绑定MQTT端口(默认1883) - 配置Channel Pipeline 2. **shutdown()**: 关闭MQTT服务器 - 关闭Netty Channel - 关闭EventLoopGroup ### 5.3 MqttTransportHandler **位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java` **功能**: - 处理所有MQTT消息的核心Handler - 管理设备会话 - 消息格式转换和路由 **关键方法**: 1. **channelRead()**: Netty Channel读取消息 ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof MqttMessage) { MqttMessage message = (MqttMessage) msg; if (message.decoderResult().isSuccess()) { processMqttMsg(ctx, message); } } } ``` 2. **processMqttMsg()**: 处理MQTT消息 - 根据消息类型(CONNECT、PUBLISH、SUBSCRIBE等)分发处理 3. **processConnect()**: 处理设备连接 - 设备认证 - 创建设备会话 - 发送CONNACK 4. **processDevicePublish()**: 处理设备发布消息 - 判断主题类型 - 转换消息格式 - 调用transportService.process() 5. **processSubscribe()**: 处理设备订阅 - 保存订阅信息 - 发送SUBACK ### 5.4 DeviceSessionCtx **位置**: `common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/device/MqttDeviceSessionCtx.java` **功能**: - 管理设备MQTT会话信息 - 存储设备ID、会话ID等 - 管理订阅的主题 - 缓存设备配置 **关键属性**: - `sessionId`: 会话ID - `deviceId`: 设备ID - `tenantId`: 租户ID - `subscriptions`: 订阅的主题列表 ### 5.5 DefaultTransportService **位置**: `common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java` **功能**: - 传输服务的统一接口实现 - 处理所有类型的设备消息 - 发送消息到核心服务 - 接收核心服务消息并推送到设备 **关键方法**: 1. **process(PostTelemetryMsg)**: 处理遥测数据 ```java public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) { // 构建TransportToDeviceActorMsg TransportToDeviceActorMsg.Builder builder = TransportToDeviceActorMsg.newBuilder() .setSessionInfo(sessionInfo) .setPostTelemetry(msg); // 发送到核心服务 sendToCore(tenantId, deviceId, builder.build(), ...); } ``` 2. **sendToCore()**: 发送消息到核心服务 ```java private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, ...) { // 解析目标分区 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); // 发送到Kafka tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback); } ``` 3. **processNotifications()**: 处理来自核心服务的通知 - 消费Kafka通知消息 - 路由到对应的设备会话 - 通过MQTT推送到设备 --- ## 6. 数据流向图 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 流量入口 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. MQTT协议 (端口1883) │ │ ├─ 设备连接 (CONNECT) │ │ ├─ 设备发布 (PUBLISH) │ │ │ ├─ 遥测数据: v1/devices/me/telemetry │ │ │ ├─ 属性数据: v1/devices/me/attributes │ │ │ └─ RPC响应: v1/devices/me/rpc/response/+ │ │ └─ 设备订阅 (SUBSCRIBE) │ │ │ │ 2. Kafka通知 (tb.transport.* 主题) │ │ └─ tb-core 推送的消息 │ │ ├─ RPC请求 │ │ ├─ 属性更新通知 │ │ └─ 会话关闭通知 │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 流量处理 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. MQTT消息处理 │ │ MqttTransportHandler │ │ ├─ 设备连接: processConnect() │ │ │ ├─ 设备认证 │ │ │ └─ 创建会话 (DeviceSessionCtx) │ │ ├─ 数据上报: processDevicePublish() │ │ │ ├─ 判断主题类型 │ │ │ ├─ 格式转换 (MqttTransportAdaptor) │ │ │ └─ 调用 transportService.process() │ │ └─ 设备订阅: processSubscribe() │ │ │ │ 2. 传输服务处理 │ │ DefaultTransportService │ │ ├─ process(PostTelemetryMsg) │ │ ├─ process(PostAttributeMsg) │ │ └─ sendToCore() → Kafka (tb.core.*) │ │ │ │ 3. 通知消息处理 │ │ DefaultTransportService.processNotifications() │ │ ├─ 消费Kafka通知消息 │ │ ├─ 路由到设备会话 │ │ └─ 通过MQTT推送到设备 │ └─────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────┐ │ 流量出口 │ ├─────────────────────────────────────────────────────────────────┤ │ 1. Kafka消息 (tb.core.* 主题) │ │ → tb-core: 设备数据、事件等 │ │ │ │ 2. MQTT消息 (推送到设备) │ │ → IoT设备: 属性更新、RPC请求等 │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## 7. 关键配置 ### 7.1 环境变量配置 - `ZOOKEEPER_URL=zookeeper:2181`: ZooKeeper地址 - `TB_QUEUE_TYPE=kafka`: 队列类型 - `TB_KAFKA_SERVERS=kafka:9092`: Kafka地址 - `MQTT_BIND_ADDRESS=0.0.0.0`: MQTT绑定地址 - `MQTT_BIND_PORT=1883`: MQTT端口 ### 7.2 核心配置项 - `transport.mqtt.bind_address`: MQTT绑定地址 - `transport.mqtt.bind_port`: MQTT端口 - `transport.mqtt.timeout`: MQTT超时时间 - `transport.mqtt.netty.boss_group_thread_count`: Netty Boss线程数 - `transport.mqtt.netty.worker_group_thread_count`: Netty Worker线程数 --- ## 8. 支持的MQTT主题 ### 8.1 设备遥测数据主题 - `v1/devices/me/telemetry`: 标准遥测数据主题 - `telemetry`: 短主题(简化模式) ### 8.2 设备属性主题 - `v1/devices/me/attributes`: 客户端属性 - `v1/devices/me/attributes/request/+`: 请求共享属性 ### 8.3 RPC主题 - `v1/devices/me/rpc/request/+`: 订阅RPC请求 - `v1/devices/me/rpc/response/+`: 发送RPC响应 --- ## 9. 总结 **tb-mqtt-transport** 是 ThingsBoard 的 MQTT 传输层服务,负责: 1. **MQTT服务器**: 基于Netty实现高性能MQTT服务器 2. **设备连接管理**: 设备认证、会话管理 3. **消息转换**: 支持多种消息格式(JSON、Protobuf等) 4. **上行数据流**: 设备数据 → MQTT → Kafka → tb-core 5. **下行数据流**: tb-core → Kafka → MQTT → 设备 6. **双向通信**: 支持设备上报和服务器推送 整个流程采用了**异步非阻塞**的Netty架构,支持大量并发连接,通过**Kafka**实现与核心服务的解耦通信。