thingsboard/summary/镜像流量分析/03-tb-mqtt-transport流量分析.md

20 KiB
Raw Permalink Blame History

tb-mqtt-transport 流量分析

1. 镜像概述

tb-mqtt-transport 是 ThingsBoard 的 MQTT 传输服务,负责:

  • 接收设备通过 MQTT 协议发送的消息
  • 设备认证和会话管理
  • 消息格式转换JSON、Protobuf等
  • 将设备消息发送到核心服务通过Kafka
  • 接收核心服务消息并推送到设备

镜像名称: thingsboard/tb-mqtt-transport
源码位置:

  • 主应用: transport/mqtt/src/main/java/org/thingsboard/server/mqtt/
  • 传输处理: common/transport/mqtt/

端口: 1883 (MQTT)


2. 流量入口

2.1 MQTT 协议入口

入口点:

  • common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java
  • common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

端口: 1883 (MQTT协议)

流量来源:

  • IoT设备: 通过MQTT协议连接并发送数据
  • MQTT客户端: 各种MQTT客户端库
  • 网关设备: MQTT网关转发设备数据

MQTT消息类型:

  • CONNECT: 设备连接请求
  • PUBLISH: 设备发布消息遥测、属性、RPC响应等
  • SUBSCRIBE: 设备订阅主题属性更新、RPC请求等
  • DISCONNECT: 设备断开连接

2.2 Kafka 消息入口(下行)

入口点:

  • common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

Kafka主题: tb.transport.* (通知主题)

流量来源:

  • tb-core: 核心服务推送的消息RPC请求、属性更新通知等

消息类型:

  • ToTransportMsg: 传输层消息
    • ToDeviceRpcRequestMsg: RPC请求
    • AttributeUpdateNotificationMsg: 属性更新通知
    • SessionCloseNotificationProto: 会话关闭通知

3. 流量处理流程

3.1 设备连接流程(上行)

设备 (MQTT CONNECT)
  ↓
MqttTransportServerInitializer (Netty Channel初始化)
  ↓
MqttTransportHandler.channelRegistered()
  ↓
设备CONNECT消息
  ↓
MqttTransportHandler.processConnect()
  ↓
设备认证
  ├─ 通过TransportApiRequestMsg查询设备
  ├─ 验证设备凭证AccessToken等
  └─ 获取设备信息
  ↓
创建设备会话 (DeviceSessionCtx)
  ↓
发送CONNACK确认
  ↓
订阅通知主题 (Kafka: tb.transport.*)
  ↓
设备连接完成

核心代码模块:

  1. MqttTransportService (common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java)

    • 功能: MQTT传输服务启动和管理
    • 关键方法:
      • init(): 初始化Netty服务器绑定MQTT端口
      @PostConstruct
      public void init() throws Exception {
          ServerBootstrap b = new ServerBootstrap();
          b.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new MqttTransportServerInitializer(context, false))
                  .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
      
          serverChannel = b.bind(host, port).sync().channel();
      }
      
  2. MqttTransportHandler (common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java)

    • 功能: 处理MQTT消息的核心Handler
    • 关键方法:
      • channelRead(): Netty Channel读取消息
      • processMqttMsg(): 处理MQTT消息
      • processConnect(): 处理设备连接

3.2 设备数据上报流程(上行)

设备 (MQTT PUBLISH)
  主题: v1/devices/me/telemetry 或 v1/devices/me/attributes
  ↓
MqttTransportHandler.channelRead()
  ↓
MqttTransportHandler.processMqttMsg()
  ↓
processDevicePublish() (判断主题类型)
  ├─ 遥测数据 (isDeviceTelemetryTopic)
  │   └─ PostTelemetryMsg
  ├─ 属性数据 (isDeviceAttributesTopic)
  │   └─ PostAttributeMsg
  ├─ RPC响应 (DEVICE_RPC_RESPONSE_TOPIC)
  │   └─ ToDeviceRpcResponseMsg
  └─ 其他类型...
  ↓
MqttTransportAdaptor转换消息格式
  ├─ JSON格式 → Protobuf
  ├─ Protobuf格式 → Protobuf
  └─ 自定义格式 → Protobuf
  ↓
DefaultTransportService.process()
  ↓
构建ToCoreMsg
  ├─ TransportToDeviceActorMsg (设备消息)
  └─ 其他设备事件
  ↓
发送到Kafka (tb.core.* 主题)
  ↓
tb-core 接收并处理

核心代码模块:

  1. MqttTransportHandler.processDevicePublish() (common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java)

    • 功能: 处理设备发布的MQTT消息
    • 关键逻辑:
      private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
          // 判断主题类型
          if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
              // 遥测数据
              TransportProtos.PostTelemetryMsg postTelemetryMsg = 
                  payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
              transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, ...);
          } else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
              // 属性数据
              TransportProtos.PostAttributeMsg postAttributeMsg = 
                  payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
              transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, ...);
          }
          // ...
      }
      
  2. MqttTransportAdaptor (common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java)

    • 功能: MQTT消息格式转换
    • 支持格式:
      • JSON格式
      • Protobuf格式
      • 自定义格式
  3. DefaultTransportService.process() (common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java)

    • 功能: 传输服务统一处理接口
    • 关键方法:
      • process(PostTelemetryMsg): 处理遥测数据
      • process(PostAttributeMsg): 处理属性数据
      • sendToCore(): 发送消息到核心服务

3.3 核心服务消息推送流程(下行)

tb-core 发送消息
  ↓
Kafka (tb.transport.* 主题)
  ↓
DefaultTransportService.transportNotificationsConsumer
  ↓
消费消息 (ToTransportMsg)
  ├─ ToDeviceRpcRequestMsg: RPC请求
  ├─ AttributeUpdateNotificationMsg: 属性更新
  └─ SessionCloseNotificationProto: 会话关闭
  ↓
根据nodeId路由到对应的传输服务实例
  ↓
MqttTransportHandler处理
  ├─ RPC请求 → 发布到设备订阅的RPC主题
  ├─ 属性更新 → 发布到设备订阅的属性主题
  └─ 会话关闭 → 关闭MQTT连接
  ↓
设备接收MQTT消息

核心代码模块:

  1. DefaultTransportService.init() - 初始化通知消费者

    @PostConstruct
    public void init() {
        // 创建通知消费者
        transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
                .name("TB Transport Notifications")
                .msgPackProcessor(this::processNotifications)
                .pollInterval(notificationsPollDuration)
                .consumerCreator(() -> queueProvider.createTransportNotificationsConsumer())
                .consumerExecutor(consumerExecutor)
                .threadPrefix("transport-notifications")
                .build();
    }
    
  2. processNotifications() - 处理通知消息

    private void processNotifications(List<TbProtoQueueMsg<ToTransportMsg>> msgs, ...) {
        msgs.forEach(msg -> {
            ToTransportMsg toTransportMsg = msg.getValue();
            // 根据消息类型处理
            if (toTransportMsg.hasToDeviceRpcRequestMsg()) {
                // RPC请求
                processRpcRequest(toTransportMsg.getToDeviceRpcRequestMsg());
            } else if (toTransportMsg.hasAttributeUpdateNotificationMsg()) {
                // 属性更新
                processAttributeUpdate(toTransportMsg.getAttributeUpdateNotificationMsg());
            }
        });
    }
    
  3. MqttTransportHandler发送MQTT消息 - 将消息推送到设备

    private void sendRpcRequest(ToDeviceRpcRequestMsg rpcRequest) {
        // 获取设备订阅的RPC请求主题
        String topic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
        // 发布MQTT消息
        publish(ctx, topic, rpcRequestPayload, MqttQoS.AT_LEAST_ONCE);
    }
    

3.4 设备订阅流程

设备 (MQTT SUBSCRIBE)
  主题: v1/devices/me/attributes 或 v1/devices/me/rpc/request/+
  ↓
MqttTransportHandler.processSubscribe()
  ↓
判断订阅主题类型
  ├─ 属性订阅 → 订阅属性更新
  ├─ RPC请求订阅 → 订阅RPC请求
  └─ 其他订阅...
  ↓
保存订阅信息到DeviceSessionCtx
  ↓
发送SUBACK确认
  ↓
当有属性更新或RPC请求时发布到对应主题

4. 流量出口

4.1 Kafka消息出口上行

出口点: DefaultTransportService.sendToCore()

Kafka主题: tb.core.*

流量去向:

  • tb-core: 核心服务处理设备数据

消息类型:

  • ToCoreMsg: 包含TransportToDeviceActorMsg
    • PostTelemetryMsg: 遥测数据
    • PostAttributeMsg: 属性数据
    • ToDeviceRpcResponseMsg: RPC响应
    • DeviceConnectProto: 设备连接事件
    • DeviceDisconnectProto: 设备断开事件

4.2 MQTT消息出口下行

出口点: MqttTransportHandler.publish()

MQTT主题:

  • v1/devices/{deviceId}/attributes: 属性更新
  • v1/devices/{deviceId}/rpc/request/{requestId}: RPC请求
  • 其他设备订阅的主题

流量去向:

  • IoT设备: 设备接收MQTT消息

消息类型:

  • PUBLISH消息属性更新、RPC请求等

5. 核心代码模块详解

5.1 ThingsboardMqttTransportApplication

位置: transport/mqtt/src/main/java/org/thingsboard/server/mqtt/ThingsboardMqttTransportApplication.java

功能:

  • MQTT传输服务的Spring Boot应用入口
  • 组件扫描配置

关键代码:

@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", 
                "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue", 
                "org.thingsboard.server.cache"})
public class ThingsboardMqttTransportApplication {
    public static void main(String[] args) {
        SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
    }
}

5.2 MqttTransportService

位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportService.java

功能:

  • 启动和管理MQTT服务器基于Netty
  • 处理SSL连接可选
  • 管理Netty EventLoopGroup

关键方法:

  1. init(): 初始化MQTT服务器

    • 创建Netty ServerBootstrap
    • 绑定MQTT端口默认1883
    • 配置Channel Pipeline
  2. shutdown(): 关闭MQTT服务器

    • 关闭Netty Channel
    • 关闭EventLoopGroup

5.3 MqttTransportHandler

位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java

功能:

  • 处理所有MQTT消息的核心Handler
  • 管理设备会话
  • 消息格式转换和路由

关键方法:

  1. channelRead(): Netty Channel读取消息

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof MqttMessage) {
            MqttMessage message = (MqttMessage) msg;
            if (message.decoderResult().isSuccess()) {
                processMqttMsg(ctx, message);
            }
        }
    }
    
  2. processMqttMsg(): 处理MQTT消息

    • 根据消息类型CONNECT、PUBLISH、SUBSCRIBE等分发处理
  3. processConnect(): 处理设备连接

    • 设备认证
    • 创建设备会话
    • 发送CONNACK
  4. processDevicePublish(): 处理设备发布消息

    • 判断主题类型
    • 转换消息格式
    • 调用transportService.process()
  5. processSubscribe(): 处理设备订阅

    • 保存订阅信息
    • 发送SUBACK

5.4 DeviceSessionCtx

位置: common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/device/MqttDeviceSessionCtx.java

功能:

  • 管理设备MQTT会话信息
  • 存储设备ID、会话ID等
  • 管理订阅的主题
  • 缓存设备配置

关键属性:

  • sessionId: 会话ID
  • deviceId: 设备ID
  • tenantId: 租户ID
  • subscriptions: 订阅的主题列表

5.5 DefaultTransportService

位置: common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

功能:

  • 传输服务的统一接口实现
  • 处理所有类型的设备消息
  • 发送消息到核心服务
  • 接收核心服务消息并推送到设备

关键方法:

  1. process(PostTelemetryMsg): 处理遥测数据

    public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, ...) {
        // 构建TransportToDeviceActorMsg
        TransportToDeviceActorMsg.Builder builder = TransportToDeviceActorMsg.newBuilder()
                .setSessionInfo(sessionInfo)
                .setPostTelemetry(msg);
    
        // 发送到核心服务
        sendToCore(tenantId, deviceId, builder.build(), ...);
    }
    
  2. sendToCore(): 发送消息到核心服务

    private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, ...) {
        // 解析目标分区
        TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
    
        // 发送到Kafka
        tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), callback);
    }
    
  3. processNotifications(): 处理来自核心服务的通知

    • 消费Kafka通知消息
    • 路由到对应的设备会话
    • 通过MQTT推送到设备

6. 数据流向图

┌─────────────────────────────────────────────────────────────────┐
│                      流量入口                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT协议 (端口1883)                                           │
│    ├─ 设备连接 (CONNECT)                                         │
│    ├─ 设备发布 (PUBLISH)                                         │
│    │   ├─ 遥测数据: v1/devices/me/telemetry                     │
│    │   ├─ 属性数据: v1/devices/me/attributes                    │
│    │   └─ RPC响应: v1/devices/me/rpc/response/+                │
│    └─ 设备订阅 (SUBSCRIBE)                                       │
│                                                                   │
│ 2. Kafka通知 (tb.transport.* 主题)                              │
│    └─ tb-core 推送的消息                                         │
│        ├─ RPC请求                                               │
│        ├─ 属性更新通知                                           │
│        └─ 会话关闭通知                                           │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量处理                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. MQTT消息处理                                                 │
│    MqttTransportHandler                                         │
│      ├─ 设备连接: processConnect()                              │
│      │   ├─ 设备认证                                             │
│      │   └─ 创建会话 (DeviceSessionCtx)                         │
│      ├─ 数据上报: processDevicePublish()                        │
│      │   ├─ 判断主题类型                                         │
│      │   ├─ 格式转换 (MqttTransportAdaptor)                     │
│      │   └─ 调用 transportService.process()                    │
│      └─ 设备订阅: processSubscribe()                            │
│                                                                   │
│ 2. 传输服务处理                                                 │
│    DefaultTransportService                                      │
│      ├─ process(PostTelemetryMsg)                               │
│      ├─ process(PostAttributeMsg)                               │
│      └─ sendToCore() → Kafka (tb.core.*)                      │
│                                                                   │
│ 3. 通知消息处理                                                 │
│    DefaultTransportService.processNotifications()              │
│      ├─ 消费Kafka通知消息                                       │
│      ├─ 路由到设备会话                                           │
│      └─ 通过MQTT推送到设备                                       │
└─────────────────────────────────────────────────────────────────┘
                              ↓
┌─────────────────────────────────────────────────────────────────┐
│                      流量出口                                     │
├─────────────────────────────────────────────────────────────────┤
│ 1. Kafka消息 (tb.core.* 主题)                                   │
│    → tb-core: 设备数据、事件等                                   │
│                                                                   │
│ 2. MQTT消息 (推送到设备)                                         │
│    → IoT设备: 属性更新、RPC请求等                                │
└─────────────────────────────────────────────────────────────────┘

7. 关键配置

7.1 环境变量配置

  • ZOOKEEPER_URL=zookeeper:2181: ZooKeeper地址
  • TB_QUEUE_TYPE=kafka: 队列类型
  • TB_KAFKA_SERVERS=kafka:9092: Kafka地址
  • MQTT_BIND_ADDRESS=0.0.0.0: MQTT绑定地址
  • MQTT_BIND_PORT=1883: MQTT端口

7.2 核心配置项

  • transport.mqtt.bind_address: MQTT绑定地址
  • transport.mqtt.bind_port: MQTT端口
  • transport.mqtt.timeout: MQTT超时时间
  • transport.mqtt.netty.boss_group_thread_count: Netty Boss线程数
  • transport.mqtt.netty.worker_group_thread_count: Netty Worker线程数

8. 支持的MQTT主题

8.1 设备遥测数据主题

  • v1/devices/me/telemetry: 标准遥测数据主题
  • telemetry: 短主题(简化模式)

8.2 设备属性主题

  • v1/devices/me/attributes: 客户端属性
  • v1/devices/me/attributes/request/+: 请求共享属性

8.3 RPC主题

  • v1/devices/me/rpc/request/+: 订阅RPC请求
  • v1/devices/me/rpc/response/+: 发送RPC响应

9. 总结

tb-mqtt-transport 是 ThingsBoard 的 MQTT 传输层服务,负责:

  1. MQTT服务器: 基于Netty实现高性能MQTT服务器
  2. 设备连接管理: 设备认证、会话管理
  3. 消息转换: 支持多种消息格式JSON、Protobuf等
  4. 上行数据流: 设备数据 → MQTT → Kafka → tb-core
  5. 下行数据流: tb-core → Kafka → MQTT → 设备
  6. 双向通信: 支持设备上报和服务器推送

整个流程采用了异步非阻塞的Netty架构支持大量并发连接通过Kafka实现与核心服务的解耦通信。