diff --git a/src/Common/config.h b/src/Common/config.h index 98ab289c..3dc092ea 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -330,6 +330,8 @@ extern const std::string kRtpMaxSize; extern const std::string kLowLatency; //H264 rtp打包模式是否采用stap-a模式(为了在老版本浏览器上兼容webrtc)还是采用Single NAL unit packet per H.264 模式 extern const std::string kH264StapA; +// rtp server app名称,默认 rtp +extern const std::string kRtpAppName; } // namespace Rtp ////////////组播配置/////////// diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b0161d91..dccb345f 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -19,17 +19,18 @@ using namespace std; using namespace toolkit; -static constexpr char kRtpAppName[] = "rtp"; +static constexpr char kRtpSchemaName[] = "rtp"; //在创建_muxer对象前(也就是推流鉴权成功前),需要先缓存frame,这样可以防止丢包,提高体验 //但是同时需要控制缓冲长度,防止内存溢出。200帧数据,大概有10秒数据,应该足矣等待鉴权hook返回 static constexpr size_t kMaxCachedFrame = 200; namespace mediakit { -RtpProcess::RtpProcess(const string &stream_id) { - _media_info.schema = kRtpAppName; +RtpProcess::RtpProcess(const string &app_name, const string &stream_id) { + GET_CONFIG(string, kRtpAppName, Rtp::kRtpAppName); + _media_info.schema = kRtpSchemaName; _media_info.vhost = DEFAULT_VHOST; - _media_info.app = kRtpAppName; + _media_info.app = app_name.length() > 0 ? app_name : kRtpAppName; _media_info.stream = stream_id; GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index b680936c..066a54da 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -22,7 +22,7 @@ class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, pu public: using Ptr = std::shared_ptr; friend class RtpProcessHelper; - RtpProcess(const std::string &stream_id); + RtpProcess(const std::string &app_name, const std::string &stream_id); ~RtpProcess(); enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 }; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 1eb3058a..3d46b898 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -35,8 +35,9 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ return true; } -RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { +RtpProcess::Ptr RtpSelector::getProcess(const string &app_name, const string &stream_id,bool makeNew) { lock_guard lck(_mtx_map); + string app_name_origin = app_name; string stream_id_origin = stream_id; auto it_replace = _map_stream_replace.find(stream_id); if (it_replace != _map_stream_replace.end()) { @@ -53,7 +54,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { } RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin]; if (!ref) { - ref = std::make_shared(stream_id_origin, shared_from_this()); + ref = std::make_shared(app_name_origin, stream_id_origin, shared_from_this()); ref->attachEvent(); createTimer(); } @@ -128,10 +129,11 @@ void RtpSelector::onManager() { }); } -RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { +RtpProcessHelper::RtpProcessHelper(const string &app_name, const string &stream_id, const weak_ptr &parent) { + _app_name = app_name; _stream_id = stream_id; _parent = parent; - _process = std::make_shared(stream_id); + _process = std::make_shared(app_name, stream_id); } RtpProcessHelper::~RtpProcessHelper() { diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 4f46e8dc..925e115a 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -24,7 +24,7 @@ class RtpSelector; class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtpProcessHelper(const std::string &stream_id, const std::weak_ptr &parent); + RtpProcessHelper(const std::string &app_name, const std::string &stream_id, const std::weak_ptr &parent); ~RtpProcessHelper(); void attachEvent(); RtpProcess::Ptr & getProcess(); @@ -34,6 +34,7 @@ protected: bool close(MediaSource &sender) override; private: + std::string _app_name; std::string _stream_id; RtpProcess::Ptr _process; std::weak_ptr _parent; @@ -57,11 +58,12 @@ public: /** * 获取一个rtp处理器 + * @param app_name app名称 为""使用默认配置 rtp * @param stream_id 流id * @param makeNew 不存在时是否新建, 该参数为true时,必须确保之前未创建同名对象 * @return rtp处理器 */ - RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); + RtpProcess::Ptr getProcess(const std::string &app_name, const std::string &stream_id, bool makeNew); /** * 删除rtp处理器 diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 84809488..28b77683 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -30,8 +30,9 @@ class RtcpHelper: public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) { + RtcpHelper(Socket::Ptr rtcp_sock, std::string app_name, std::string stream_id) { _rtcp_sock = std::move(rtcp_sock); + _app_name = app_name; _stream_id = std::move(stream_id); } @@ -60,7 +61,7 @@ public: void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { if (!_process) { - _process = RtpSelector::Instance().getProcess(_stream_id, true); + _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); _process->setOnDetach(std::move(_on_detach)); cancelDelayTask(); @@ -96,7 +97,7 @@ public: GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { if (auto strong_self = weak_self.lock()) { - auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); + auto process = RtpSelector::Instance().getProcess(strong_self->_app_name, strong_self->_stream_id, false); if (!process && strong_self->_on_detach) { strong_self->_on_detach(); } @@ -119,6 +120,10 @@ public: } } + string getAppName() { return _app_name; } + + string getStreamId() { return _stream_id; } + private: void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { // 每5秒发送一次rtcp @@ -150,13 +155,14 @@ private: Ticker _ticker; Socket::Ptr _rtcp_sock; RtpProcess::Ptr _process; + std::string _app_name; std::string _stream_id; function _on_detach; std::shared_ptr _rtcp_addr; EventPoller::DelayTask::Ptr _delay_task; }; -void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { +void RtpServer::start(uint16_t local_port, const string &app_name, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { //创建udp服务器 Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); @@ -199,7 +205,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ //增加了多路复用判断,如果多路复用为true,就走else逻辑,同时保留了原来stream_id为空走else逻辑 if (!stream_id.empty() && !multiplex) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) - helper = std::make_shared(std::move(rtcp_socket), stream_id); + helper = std::make_shared(std::move(rtcp_socket), app_name, stream_id); helper->startRtcp(); helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track); bool bind_peer_addr = false; @@ -252,6 +258,14 @@ uint16_t RtpServer::getPort() { return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port(); } +std::string RtpServer::getAppName() { + return _rtcp_helper ? _rtcp_helper->getAppName() : ""; +} + +std::string RtpServer::getStreamId() { + return _rtcp_helper ? _rtcp_helper->getStreamId() : ""; +} + void RtpServer::connectToServer(const std::string &url, uint16_t port, const function &cb) { if (_tcp_mode != ACTIVE || !_rtp_socket) { cb(SockException(Err_other, "仅支持tcp主动模式")); diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 3654828e..1b10ba33 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -43,7 +43,8 @@ public: * @param ssrc 指定的ssrc * @param multiplex 多路复用 */ - void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, + void start( + uint16_t local_port, const std::string &app_name = "", const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false); /** @@ -59,6 +60,16 @@ public: */ uint16_t getPort(); + /** + * 获取绑定的app名称 + */ + std::string getAppName(); + + /** + * 获取绑定的stream id + */ + std::string getStreamId(); + /** * 设置RtpProcess onDetach事件回调 */ diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 95807637..058b0df3 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -21,6 +21,7 @@ using namespace toolkit; namespace mediakit{ +const string RtpSession::kAppName = "app_name"; const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kSSRC = "ssrc"; const string RtpSession::kOnlyTrack = "only_track"; @@ -31,6 +32,7 @@ void RtpSession::attachServer(const Server &server) { } void RtpSession::setParams(mINI &ini) { + _app_name = ini[kAppName]; _stream_id = ini[kStreamID]; _ssrc = ini[kSSRC]; _only_track = ini[kOnlyTrack]; @@ -114,7 +116,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { _stream_id = printSSRC(_ssrc); } try { - _process = RtpSelector::Instance().getProcess(_stream_id, true); + _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true); } catch (RtpSelector::ProcessExisted &ex) { if (!_is_udp) { // tcp情况下立即断开连接 diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 2bff4f5f..53f33125 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -22,6 +22,7 @@ namespace mediakit{ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent { public: + static const std::string kAppName; static const std::string kStreamID; static const std::string kSSRC; static const std::string kOnlyTrack; @@ -55,6 +56,7 @@ private: int _only_track = 0; uint32_t _ssrc = 0; toolkit::Ticker _ticker; + std::string _app_name; std::string _stream_id; struct sockaddr_storage _addr; RtpProcess::Ptr _process; diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 9bb7e4bc..8f762356 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) { memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; auto sock = Socket::createSocket(poller); - auto process = RtpSelector::Instance().getProcess("test", true); + auto process = RtpSelector::Instance().getProcess("", "test", true); uint64_t stamp_last = 0; auto total_size = std::make_shared(0);