From 2cdeddeb2c5c12ff2a632b527e31bb2a34ef9d9f Mon Sep 17 00:00:00 2001 From: ziyue <1213642868@qq.com> Date: Tue, 1 Nov 2022 17:27:27 +0800 Subject: [PATCH] =?UTF-8?q?RtpServer=E6=94=AF=E6=8C=81udp=E4=B8=8Etcp?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E5=B9=B6=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/RtpProcess.cpp | 4 +- src/Rtp/RtpProcess.h | 2 +- src/Rtp/RtpServer.cpp | 104 +++++++++++++++++++++++------------------ src/Rtp/RtpServer.h | 8 ++-- 4 files changed, 67 insertions(+), 51 deletions(-) diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b9aa5f1b..4f1f0ce5 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -191,8 +191,8 @@ void RtpProcess::onDetach() { } } -void RtpProcess::setOnDetach(const function &cb) { - _on_detach = cb; +void RtpProcess::setOnDetach(function cb) { + _on_detach = std::move(cb); } string RtpProcess::get_peer_ip() { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 8b951216..afa3e06f 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -50,7 +50,7 @@ public: /** * 设置onDetach事件回调 */ - void setOnDetach(const std::function &cb); + void setOnDetach(std::function cb); /** * 设置onDetach事件回调,false检查RTP超时,true停止 diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 9e066076..f53bf70f 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -31,27 +31,48 @@ class RtcpHelper: public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtcpHelper(Socket::Ptr rtcp_sock, RtpProcess::Ptr process) { + RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) { _rtcp_sock = std::move(rtcp_sock); - _process = std::move(process); + _stream_id = std::move(stream_id); } - void onRecvRtp(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len){ - //统计rtp接受情况,用于发送rr包 - auto header = (RtpHeader *) buf->data(); - sendRtcp(ntohl(header->ssrc), addr, addr_len); + ~RtcpHelper() { + if (_process) { + // 删除rtp处理器 + RtpSelector::Instance().delProcess(_stream_id, _process.get()); + } } - void startRtcp(){ + void setOnDetach(function cb) { + if (_process) { + _process->setOnDetach(std::move(cb)); + } else { + _on_detach = std::move(cb); + } + } + + void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { + if (!_process) { + _process = RtpSelector::Instance().getProcess(_stream_id, true); + _process->setOnDetach(std::move(_on_detach)); + } + _process->inputRtp(true, sock, buf->data(), buf->size(), addr); + + // 统计rtp接受情况,用于发送rr包 + auto header = (RtpHeader *)buf->data(); + sendRtcp(ntohl(header->ssrc), addr); + } + + void startRtcp() { weak_ptr weak_self = shared_from_this(); _rtcp_sock->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { - //用于接受rtcp打洞包 + // 用于接受rtcp打洞包 auto strong_self = weak_self.lock(); - if (!strong_self) { + if (!strong_self || !strong_self->_process) { return; } if (!strong_self->_rtcp_addr) { - //只设置一次rtcp对端端口 + // 只设置一次rtcp对端端口 strong_self->_rtcp_addr = std::make_shared(); memcpy(strong_self->_rtcp_addr.get(), addr, addr_len); } @@ -63,30 +84,32 @@ public: } private: - void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr, int addr_len){ - //每5秒发送一次rtcp - if (_ticker.elapsedTime() < 5000) { + void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { + // 每5秒发送一次rtcp + if (_ticker.elapsedTime() < 5000 || !_process) { return; } _ticker.resetTime(); auto rtcp_addr = (struct sockaddr *)_rtcp_addr.get(); if (!rtcp_addr) { - //默认的,rtcp端口为rtp端口+1 - switch(addr->sa_family){ - case AF_INET: ((sockaddr_in *) addr)->sin_port = htons(ntohs(((sockaddr_in *) addr)->sin_port) + 1); break; - case AF_INET6: ((sockaddr_in6 *) addr)->sin6_port = htons(ntohs(((sockaddr_in6 *) addr)->sin6_port) + 1); break; + // 默认的,rtcp端口为rtp端口+1 + switch (addr->sa_family) { + case AF_INET: ((sockaddr_in *)addr)->sin_port = htons(ntohs(((sockaddr_in *)addr)->sin_port) + 1); break; + case AF_INET6: ((sockaddr_in6 *)addr)->sin6_port = htons(ntohs(((sockaddr_in6 *)addr)->sin6_port) + 1); break; } - //未收到rtcp打洞包时,采用默认的rtcp端口 + // 未收到rtcp打洞包时,采用默认的rtcp端口 rtcp_addr = addr; } - _rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr, addr_len); + _rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr); } private: Ticker _ticker; Socket::Ptr _rtcp_sock; RtpProcess::Ptr _process; + std::string _stream_id; + function _on_detach; std::shared_ptr _rtcp_addr; }; @@ -127,25 +150,20 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ //创建udp服务器 UdpServer::Ptr udp_server; - RtpProcess::Ptr process; + RtcpHelper::Ptr helper; if (!stream_id.empty()) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) - if (tcp_mode == NONE) { - process = RtpSelector::Instance().getProcess(stream_id, true); - RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), process); - helper->startRtcp(); - rtp_socket->setOnRead( - [rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { - RtpHeader *header = (RtpHeader *)buf->data(); - auto rtp_ssrc = ntohl(header->ssrc); - if (ssrc && rtp_ssrc != ssrc) { - WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; - } else { - process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); - helper->onRecvRtp(buf, addr, addr_len); - } - }); - } + helper = std::make_shared(std::move(rtcp_socket), stream_id); + helper->startRtcp(); + rtp_socket->setOnRead([rtp_socket, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + RtpHeader *header = (RtpHeader *)buf->data(); + auto rtp_ssrc = ntohl(header->ssrc); + if (ssrc && rtp_ssrc != ssrc) { + WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; + } else { + helper->onRecvRtp(rtp_socket, buf, addr); + } + }); } else { #if 1 //单端口多线程接收多个流,根据ssrc区分流 @@ -162,26 +180,22 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ #endif } - _on_cleanup = [rtp_socket, process, stream_id]() { + _on_cleanup = [rtp_socket, stream_id]() { if (rtp_socket) { //去除循环引用 rtp_socket->setOnRead(nullptr); } - if (process) { - //删除rtp处理器 - RtpSelector::Instance().delProcess(stream_id, process.get()); - } }; _tcp_server = tcp_server; _udp_server = udp_server; _rtp_socket = rtp_socket; - _rtp_process = process; + _rtcp_helper = helper; } -void RtpServer::setOnDetach(const function &cb) { - if (_rtp_process) { - _rtp_process->setOnDetach(cb); +void RtpServer::setOnDetach(function cb) { + if (_rtcp_helper) { + _rtcp_helper->setOnDetach(std::move(cb)); } } diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index ff2480be..7c595591 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -18,7 +18,9 @@ #include "Network/UdpServer.h" #include "RtpSession.h" -namespace mediakit{ +namespace mediakit { + +class RtcpHelper; /** * RTP服务器,支持UDP/TCP @@ -60,7 +62,7 @@ public: /** * 设置RtpProcess onDetach事件回调 */ - void setOnDetach(const std::function &cb); + void setOnDetach(std::function cb); private: // tcp主动模式连接服务器成功回调 @@ -70,7 +72,7 @@ protected: toolkit::Socket::Ptr _rtp_socket; toolkit::UdpServer::Ptr _udp_server; toolkit::TcpServer::Ptr _tcp_server; - RtpProcess::Ptr _rtp_process; + std::shared_ptr _rtcp_helper; std::function _on_cleanup; //用于tcp主动模式