From 66b18bc7f876ab7fe717337a5862fba3b0664d5f Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Wed, 3 Apr 2024 14:26:15 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=9D=9Ewindows=E4=BD=BF?= =?UTF-8?q?=E7=94=A8iconv=E8=BD=AC=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/strCoding.cpp | 124 +++++++++++++++++++++++++++++++++------ src/Common/strCoding.h | 4 +- 2 files changed, 107 insertions(+), 21 deletions(-) diff --git a/src/Common/strCoding.cpp b/src/Common/strCoding.cpp index 1a0f0236..ecedce13 100644 --- a/src/Common/strCoding.cpp +++ b/src/Common/strCoding.cpp @@ -153,23 +153,95 @@ void Gb2312ToUnicode(wchar_t* pOut, const char *gbBuffer) { MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, gbBuffer, 2, pOut, 1); } +#else +#include +// 将 GB2312 编码的字符串转换为 UTF-8 编码 +char *gb2312_to_utf8(const char *gb2312_string) { + size_t in_len = strlen(gb2312_string); + size_t out_len = in_len * 4; // UTF-8 最多需要 4 倍空间 + + iconv_t cd = iconv_open("UTF-8", "GBK");//GBK 是在 GB2312 的基础上进行扩展的字符集,包含了 GB2312 中的所有字符 + if (cd == (iconv_t)-1) { + perror("iconv_open"); + return NULL; + } + + char *inbuf = (char *)gb2312_string; + char *outbuf = (char *)malloc(out_len + 1); // 分配足够的空间来存储转换后的字符串 + if (outbuf == NULL) { + perror("malloc"); + iconv_close(cd); + return NULL; + } + memset(outbuf, 0, out_len + 1); + + char *inptr = inbuf; + char *outptr = outbuf; + + if (iconv(cd, &inptr, &in_len, &outptr, &out_len) == (size_t)-1) { + perror("iconv"); + free(outbuf); + iconv_close(cd); + return NULL; + } + + iconv_close(cd); + + return outbuf; +} + +// 跨平台的 UTF-8 转 GB2312 编码 +char *utf8_to_gb2312(const char *utf8_string) { + char *result = NULL; + // 非 Windows 平台使用 iconv 函数进行编码转换 + size_t in_len = strlen(utf8_string); + size_t out_len = in_len * 4; // GB2312 最多需要 4 倍空间 + + iconv_t cd = iconv_open("GBK", "UTF-8"); + if (cd == (iconv_t)-1) { + perror("iconv_open"); + return NULL; + } + + char *inbuf = (char *)utf8_string; + char *outbuf = (char *)malloc(out_len + 1); // 分配足够的空间来存储转换后的字符串 + if (outbuf == NULL) { + perror("malloc"); + iconv_close(cd); + return NULL; + } + memset(outbuf, 0, out_len + 1); + + char *inptr = inbuf; + char *outptr = outbuf; + + if (iconv(cd, &inptr, &in_len, &outptr, &out_len) == (size_t)-1) { + perror("iconv"); + free(outbuf); + iconv_close(cd); + return NULL; + } + + iconv_close(cd); + + result = outbuf; + return result; +} +#endif//defined(_WIN32) string strCoding::UTF8ToGB2312(const string &str) { +#ifdef WIN32 auto len = str.size(); auto pText = str.data(); - char Ctemp[4] = {0}; + char Ctemp[4] = { 0 }; char *pOut = new char[len + 1]; memset(pOut, 0, len + 1); int i = 0, j = 0; - while (i < len) - { - if (pText[i] >= 0) - { + while (i < len) { + if (pText[i] >= 0) { pOut[j++] = pText[i++]; - } - else - { + } else { wchar_t Wtemp; UTF8ToUnicode(&Wtemp, pText + i); UnicodeToGB2312(Ctemp, Wtemp); @@ -182,25 +254,31 @@ string strCoding::UTF8ToGB2312(const string &str) { string ret = pOut; delete[] pOut; return ret; +#else + char *gb2312_string = utf8_to_gb2312(str.c_str()); + if (gb2312_string == NULL) { + return ""; + } + string result(gb2312_string); + free(gb2312_string); + return result; +#endif } string strCoding::GB2312ToUTF8(const string &str) { +#ifdef WIN32 auto len = str.size(); auto pText = str.data(); char buf[4] = { 0 }; auto nLength = len * 3; - char* pOut = new char[nLength]; + char *pOut = new char[nLength]; memset(pOut, 0, nLength); size_t i = 0, j = 0; - while (i < len) - { - //如果是英文直接复制就可以 - if (*(pText + i) >= 0) - { + while (i < len) { + // 如果是英文直接复制就可以 + if (*(pText + i) >= 0) { pOut[j++] = pText[i++]; - } - else - { + } else { wchar_t pbuffer; Gb2312ToUnicode(&pbuffer, pText + i); UnicodeToUTF8(buf, &pbuffer); @@ -210,11 +288,19 @@ string strCoding::GB2312ToUTF8(const string &str) { j += 3; i += 2; } - } + } string ret = pOut; delete[] pOut; return ret; +#else + char *utf8_string = gb2312_to_utf8(str.c_str()); + if (utf8_string == NULL) { + return ""; + } + string result(utf8_string); + free(utf8_string); + return result; +#endif } -#endif//defined(_WIN32) } /* namespace mediakit */ diff --git a/src/Common/strCoding.h b/src/Common/strCoding.h index e715e74d..ad04ed03 100644 --- a/src/Common/strCoding.h +++ b/src/Common/strCoding.h @@ -22,10 +22,10 @@ public: static std::string UrlEncodeComponent(const std::string &str); // url参数 utf8编码 static std::string UrlDecodePath(const std::string &str); //url路径 utf8解码 static std::string UrlDecodeComponent(const std::string &str); // url参数 utf8解码 -#if defined(_WIN32) + static std::string UTF8ToGB2312(const std::string &str);//utf_8转为gb2312 static std::string GB2312ToUTF8(const std::string &str); //gb2312 转utf_8 -#endif//defined(_WIN32) + private: strCoding(void); virtual ~strCoding(void); From e9887c6cd979f91536d2160a43be5e749810a9be Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Wed, 3 Apr 2024 14:27:18 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E5=A2=9E=E5=8A=A0checkHost=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/Parser.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Common/Parser.cpp b/src/Common/Parser.cpp index 7e7de860..cf50222b 100644 --- a/src/Common/Parser.cpp +++ b/src/Common/Parser.cpp @@ -302,6 +302,8 @@ void RtspUrl::setup(bool is_ssl, const string &url, const string &user, const st } static void inline checkHost(std::string &host) { + if (host.size() == 0) + return; if (host.back() == ']' && host.front() == '[') { // ipv6去除方括号 host.pop_back(); From 584944d7775baef81a995ac0936e3ef3f0d92b8f Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Wed, 3 Apr 2024 14:35:13 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E5=B1=8F=E8=94=BD=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89WebSocketSessionBase=E5=88=9B?= =?UTF-8?q?=E5=BB=BAhttp=20server=E6=97=B6ws=E6=94=B6=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=B4=A9=E6=BA=83=E9=97=AE=E9=A2=98=EF=BC=88=E5=8E=9F=E5=9B=A0?= =?UTF-8?q?=E6=9C=AA=E5=AE=9A=E4=BD=8D=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/WebSocketSession.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Http/WebSocketSession.h b/src/Http/WebSocketSession.h index 93bb820e..fc34adb0 100644 --- a/src/Http/WebSocketSession.h +++ b/src/Http/WebSocketSession.h @@ -203,7 +203,8 @@ protected: } //分片缓存太大,需要清空 } - + if (!_session) + break; //最后一个包 if (_payload_cache.empty()) { //这个包是唯一个分片 From 6b1b9f3e35bfba56b30f8cccc43d5c1135a35413 Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Wed, 3 Apr 2024 14:46:33 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0track=E5=9C=A8=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E5=90=8Einput=20frame=E5=89=8D=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=EF=BC=8C=E9=81=BF=E5=85=8Dtrack=E8=AE=A2=E9=98=85=E9=94=99?= =?UTF-8?q?=E8=BF=87=E9=85=8D=E7=BD=AE=E5=B8=A7=E6=95=B0=E6=8D=AE=EF=BC=8C?= =?UTF-8?q?MP4Demuxer=E6=8E=A5=E5=8F=A3=E6=B7=BB=E5=8A=A0virtual=E5=8F=AF?= =?UTF-8?q?=E6=B4=BE=E7=94=9F=E6=89=A9=E5=B1=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Record/MP4Demuxer.cpp | 9 ++++++++- src/Record/MP4Demuxer.h | 17 ++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/Record/MP4Demuxer.cpp b/src/Record/MP4Demuxer.cpp index 4d6800d2..24657b99 100644 --- a/src/Record/MP4Demuxer.cpp +++ b/src/Record/MP4Demuxer.cpp @@ -63,6 +63,8 @@ void MP4Demuxer::onVideoTrack(uint32_t track, uint8_t object, int width, int hei } video->setIndex(track); _tracks.emplace(track, video); + if (_on_track_callback) + _on_track_callback(video); if (extra && bytes) { video->setExtraData((uint8_t *)extra, bytes); } @@ -75,6 +77,8 @@ void MP4Demuxer::onAudioTrack(uint32_t track, uint8_t object, int channel_count, } audio->setIndex(track); _tracks.emplace(track, audio); + if (_on_track_callback) + _on_track_callback(audio); if (extra && bytes) { audio->setExtraData((uint8_t *)extra, bytes); } @@ -100,7 +104,10 @@ struct Context { Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) { keyFrame = false; eof = false; - + if (!_mov_reader) { + eof = true; + return nullptr; + } static mov_reader_onread2 mov_onalloc = [](void *param, uint32_t track_id, size_t bytes, int64_t pts, int64_t dts, int flags) -> void * { Context *ctx = (Context *) param; ctx->pts = pts; diff --git a/src/Record/MP4Demuxer.h b/src/Record/MP4Demuxer.h index 889f91fc..c8b34c7b 100644 --- a/src/Record/MP4Demuxer.h +++ b/src/Record/MP4Demuxer.h @@ -20,25 +20,25 @@ class MP4Demuxer : public TrackSource { public: using Ptr = std::shared_ptr; - ~MP4Demuxer() override; + virtual ~MP4Demuxer() override; /** * 打开文件 * @param file mp4文件路径 */ - void openMP4(const std::string &file); + virtual void openMP4(const std::string &file); /** * @brief 关闭 mp4 文件 */ - void closeMP4(); + virtual void closeMP4(); /** * 移动时间轴至某处 * @param stamp_ms 预期的时间轴位置,单位毫秒 * @return 时间轴位置 */ - int64_t seekTo(int64_t stamp_ms); + virtual int64_t seekTo(int64_t stamp_ms); /** * 读取一帧数据 @@ -46,20 +46,22 @@ public: * @param eof 是否文件读取完毕 * @return 帧数据,可能为空 */ - Frame::Ptr readFrame(bool &keyFrame, bool &eof); + virtual Frame::Ptr readFrame(bool &keyFrame, bool &eof); /** * 获取所有Track信息 * @param trackReady 是否要求track为就绪状态 * @return 所有Track */ - std::vector getTracks(bool trackReady) const override; + virtual std::vector getTracks(bool trackReady) const override; /** * 获取文件长度 * @return 文件长度,单位毫秒 */ - uint64_t getDurationMS() const; + virtual uint64_t getDurationMS() const; + + virtual void setOnTrack(const std::function &callback) { _on_track_callback = callback; } private: int getAllTracks(); @@ -73,6 +75,7 @@ private: uint64_t _duration_ms = 0; std::unordered_map _tracks; toolkit::ResourcePool _buffer_pool; + std::function _on_track_callback; }; From e7eae118917bad9732f79930b2f6471d81e853b9 Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Wed, 3 Apr 2024 15:46:02 +0800 Subject: [PATCH 5/8] =?UTF-8?q?rtsp=E7=82=B9=E6=92=AD=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=E6=97=B6=E4=B8=8D=E5=B8=A6range?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtspPlayer.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 1cf393e2..5e9f273b 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -426,9 +426,8 @@ void RtspPlayer::sendPause(int type, uint32_t seekMS) { // 开启或暂停rtsp switch (type) { case type_pause: sendRtspRequest("PAUSE", _control_url, {}); break; - case type_play: - // sendRtspRequest("PLAY", _content_base); - // break; + case type_play: sendRtspRequest("PLAY", _control_url); + break; case type_seek: sendRtspRequest("PLAY", _control_url, { "Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-" }); break; @@ -440,7 +439,7 @@ void RtspPlayer::sendPause(int type, uint32_t seekMS) { } void RtspPlayer::pause(bool bPause) { - sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond()); + sendPause(bPause ? type_pause : type_play, getProgressMilliSecond()); } void RtspPlayer::speed(float speed) { From d7715303164dd08a29acd31d8900b1e45c2984e7 Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Sun, 7 Apr 2024 13:59:57 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0rtp=20server=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=87=AA=E5=AE=9A=E4=B9=89app=E5=90=8D=E7=A7=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/config.h | 2 ++ src/Rtp/RtpProcess.cpp | 9 +++++---- src/Rtp/RtpProcess.h | 2 +- src/Rtp/RtpSelector.cpp | 10 ++++++---- src/Rtp/RtpSelector.h | 6 ++++-- src/Rtp/RtpServer.cpp | 24 +++++++++++++++++++----- src/Rtp/RtpServer.h | 13 ++++++++++++- src/Rtp/RtpSession.cpp | 4 +++- src/Rtp/RtpSession.h | 2 ++ tests/test_rtp.cpp | 2 +- 10 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/Common/config.h b/src/Common/config.h index 98ab289c..3dc092ea 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -330,6 +330,8 @@ extern const std::string kRtpMaxSize; extern const std::string kLowLatency; //H264 rtp打包模式是否采用stap-a模式(为了在老版本浏览器上兼容webrtc)还是采用Single NAL unit packet per H.264 模式 extern const std::string kH264StapA; +// rtp server app名称,默认 rtp +extern const std::string kRtpAppName; } // namespace Rtp ////////////组播配置/////////// diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b0161d91..dccb345f 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -19,17 +19,18 @@ using namespace std; using namespace toolkit; -static constexpr char kRtpAppName[] = "rtp"; +static constexpr char kRtpSchemaName[] = "rtp"; //在创建_muxer对象前(也就是推流鉴权成功前),需要先缓存frame,这样可以防止丢包,提高体验 //但是同时需要控制缓冲长度,防止内存溢出。200帧数据,大概有10秒数据,应该足矣等待鉴权hook返回 static constexpr size_t kMaxCachedFrame = 200; namespace mediakit { -RtpProcess::RtpProcess(const string &stream_id) { - _media_info.schema = kRtpAppName; +RtpProcess::RtpProcess(const string &app_name, const string &stream_id) { + GET_CONFIG(string, kRtpAppName, Rtp::kRtpAppName); + _media_info.schema = kRtpSchemaName; _media_info.vhost = DEFAULT_VHOST; - _media_info.app = kRtpAppName; + _media_info.app = app_name.length() > 0 ? app_name : kRtpAppName; _media_info.stream = stream_id; GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index b680936c..066a54da 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -22,7 +22,7 @@ class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, pu public: using Ptr = std::shared_ptr; friend class RtpProcessHelper; - RtpProcess(const std::string &stream_id); + RtpProcess(const std::string &app_name, const std::string &stream_id); ~RtpProcess(); enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 }; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 1eb3058a..3d46b898 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -35,8 +35,9 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){ return true; } -RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { +RtpProcess::Ptr RtpSelector::getProcess(const string &app_name, const string &stream_id,bool makeNew) { lock_guard lck(_mtx_map); + string app_name_origin = app_name; string stream_id_origin = stream_id; auto it_replace = _map_stream_replace.find(stream_id); if (it_replace != _map_stream_replace.end()) { @@ -53,7 +54,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { } RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin]; if (!ref) { - ref = std::make_shared(stream_id_origin, shared_from_this()); + ref = std::make_shared(app_name_origin, stream_id_origin, shared_from_this()); ref->attachEvent(); createTimer(); } @@ -128,10 +129,11 @@ void RtpSelector::onManager() { }); } -RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { +RtpProcessHelper::RtpProcessHelper(const string &app_name, const string &stream_id, const weak_ptr &parent) { + _app_name = app_name; _stream_id = stream_id; _parent = parent; - _process = std::make_shared(stream_id); + _process = std::make_shared(app_name, stream_id); } RtpProcessHelper::~RtpProcessHelper() { diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 4f46e8dc..925e115a 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -24,7 +24,7 @@ class RtpSelector; class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtpProcessHelper(const std::string &stream_id, const std::weak_ptr &parent); + RtpProcessHelper(const std::string &app_name, const std::string &stream_id, const std::weak_ptr &parent); ~RtpProcessHelper(); void attachEvent(); RtpProcess::Ptr & getProcess(); @@ -34,6 +34,7 @@ protected: bool close(MediaSource &sender) override; private: + std::string _app_name; std::string _stream_id; RtpProcess::Ptr _process; std::weak_ptr _parent; @@ -57,11 +58,12 @@ public: /** * 获取一个rtp处理器 + * @param app_name app名称 为""使用默认配置 rtp * @param stream_id 流id * @param makeNew 不存在时是否新建, 该参数为true时,必须确保之前未创建同名对象 * @return rtp处理器 */ - RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); + RtpProcess::Ptr getProcess(const std::string &app_name, const std::string &stream_id, bool makeNew); /** * 删除rtp处理器 diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 84809488..28b77683 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -30,8 +30,9 @@ class RtcpHelper: public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) { + RtcpHelper(Socket::Ptr rtcp_sock, std::string app_name, std::string stream_id) { _rtcp_sock = std::move(rtcp_sock); + _app_name = app_name; _stream_id = std::move(stream_id); } @@ -60,7 +61,7 @@ public: void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { if (!_process) { - _process = RtpSelector::Instance().getProcess(_stream_id, true); + _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); _process->setOnDetach(std::move(_on_detach)); cancelDelayTask(); @@ -96,7 +97,7 @@ public: GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { if (auto strong_self = weak_self.lock()) { - auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); + auto process = RtpSelector::Instance().getProcess(strong_self->_app_name, strong_self->_stream_id, false); if (!process && strong_self->_on_detach) { strong_self->_on_detach(); } @@ -119,6 +120,10 @@ public: } } + string getAppName() { return _app_name; } + + string getStreamId() { return _stream_id; } + private: void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { // 每5秒发送一次rtcp @@ -150,13 +155,14 @@ private: Ticker _ticker; Socket::Ptr _rtcp_sock; RtpProcess::Ptr _process; + std::string _app_name; std::string _stream_id; function _on_detach; std::shared_ptr _rtcp_addr; EventPoller::DelayTask::Ptr _delay_task; }; -void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { +void RtpServer::start(uint16_t local_port, const string &app_name, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { //创建udp服务器 Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); @@ -199,7 +205,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ //增加了多路复用判断,如果多路复用为true,就走else逻辑,同时保留了原来stream_id为空走else逻辑 if (!stream_id.empty() && !multiplex) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) - helper = std::make_shared(std::move(rtcp_socket), stream_id); + helper = std::make_shared(std::move(rtcp_socket), app_name, stream_id); helper->startRtcp(); helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track); bool bind_peer_addr = false; @@ -252,6 +258,14 @@ uint16_t RtpServer::getPort() { return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port(); } +std::string RtpServer::getAppName() { + return _rtcp_helper ? _rtcp_helper->getAppName() : ""; +} + +std::string RtpServer::getStreamId() { + return _rtcp_helper ? _rtcp_helper->getStreamId() : ""; +} + void RtpServer::connectToServer(const std::string &url, uint16_t port, const function &cb) { if (_tcp_mode != ACTIVE || !_rtp_socket) { cb(SockException(Err_other, "仅支持tcp主动模式")); diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 3654828e..1b10ba33 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -43,7 +43,8 @@ public: * @param ssrc 指定的ssrc * @param multiplex 多路复用 */ - void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, + void start( + uint16_t local_port, const std::string &app_name = "", const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false); /** @@ -59,6 +60,16 @@ public: */ uint16_t getPort(); + /** + * 获取绑定的app名称 + */ + std::string getAppName(); + + /** + * 获取绑定的stream id + */ + std::string getStreamId(); + /** * 设置RtpProcess onDetach事件回调 */ diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 95807637..058b0df3 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -21,6 +21,7 @@ using namespace toolkit; namespace mediakit{ +const string RtpSession::kAppName = "app_name"; const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kSSRC = "ssrc"; const string RtpSession::kOnlyTrack = "only_track"; @@ -31,6 +32,7 @@ void RtpSession::attachServer(const Server &server) { } void RtpSession::setParams(mINI &ini) { + _app_name = ini[kAppName]; _stream_id = ini[kStreamID]; _ssrc = ini[kSSRC]; _only_track = ini[kOnlyTrack]; @@ -114,7 +116,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) { _stream_id = printSSRC(_ssrc); } try { - _process = RtpSelector::Instance().getProcess(_stream_id, true); + _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true); } catch (RtpSelector::ProcessExisted &ex) { if (!_is_udp) { // tcp情况下立即断开连接 diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 2bff4f5f..53f33125 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -22,6 +22,7 @@ namespace mediakit{ class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent { public: + static const std::string kAppName; static const std::string kStreamID; static const std::string kSSRC; static const std::string kOnlyTrack; @@ -55,6 +56,7 @@ private: int _only_track = 0; uint32_t _ssrc = 0; toolkit::Ticker _ticker; + std::string _app_name; std::string _stream_id; struct sockaddr_storage _addr; RtpProcess::Ptr _process; diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 9bb7e4bc..8f762356 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) { memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; auto sock = Socket::createSocket(poller); - auto process = RtpSelector::Instance().getProcess("test", true); + auto process = RtpSelector::Instance().getProcess("", "test", true); uint64_t stamp_last = 0; auto total_size = std::make_shared(0); From eab1dbc60641bd648f49511d3d957580561285e4 Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Sun, 7 Apr 2024 14:08:05 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E8=A1=A5=E6=8F=90rtp=20app=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E8=87=AA=E5=AE=9A=E4=B9=89=E7=9B=B8=E5=85=B3=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/include/mk_rtp_server.h | 3 ++- api/source/mk_rtp_server.cpp | 4 ++-- server/WebApi.cpp | 18 +++++++++++------- server/WebApi.h | 4 +++- src/Common/config.cpp | 2 ++ 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/api/include/mk_rtp_server.h b/api/include/mk_rtp_server.h index 60cdb43d..f4d96e25 100644 --- a/api/include/mk_rtp_server.h +++ b/api/include/mk_rtp_server.h @@ -20,10 +20,11 @@ typedef struct mk_rtp_server_t *mk_rtp_server; * 创建GB28181 RTP 服务器 * @param port 监听端口,0则为随机 * @param tcp_mode tcp模式(0: 不监听端口 1: 监听端口 2: 主动连接到服务端) + * @param app_name 该端口绑定的app名称,为""使用默认配置 rtp * @param stream_id 该端口绑定的流id * @return */ -API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id); +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *app_name, const char *stream_id); /** * TCP 主动模式时连接到服务器是否成功的回调 diff --git a/api/source/mk_rtp_server.cpp b/api/source/mk_rtp_server.cpp index 6d2228d2..6bf236e2 100644 --- a/api/source/mk_rtp_server.cpp +++ b/api/source/mk_rtp_server.cpp @@ -16,9 +16,9 @@ using namespace toolkit; #include "Rtp/RtpServer.h" using namespace mediakit; -API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id) { +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *app_name, const char *stream_id) { RtpServer::Ptr *server = new RtpServer::Ptr(new RtpServer); - (*server)->start(port, stream_id, (RtpServer::TcpMode)tcp_mode); + (*server)->start(port, app_name, stream_id, (RtpServer::TcpMode)tcp_mode); return (mk_rtp_server)server; } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index a1ef1f65..f2b7ce91 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -469,14 +469,14 @@ Value makeMediaSourceJson(MediaSource &media){ } #if defined(ENABLE_RTPPROXY) -uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { +uint16_t openRtpServer(uint16_t local_port, const string &app_name, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { if (s_rtp_server.find(stream_id)) { //为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id return 0; } auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) { - server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); + server->start(local_port, app_name, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); }); server->setOnDetach([stream_id]() { //设置rtp超时移除事件 @@ -1192,7 +1192,7 @@ void installWebApi() { CHECK_SECRET(); CHECK_ARGS("stream_id"); - auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + auto process = RtpSelector::Instance().getProcess(allArgs["app_name"], allArgs["stream_id"], false); if (!process) { val["exist"] = false; return; @@ -1204,6 +1204,7 @@ void installWebApi() { api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("port", "stream_id"); + auto app_name = allArgs["app_name"]; // 默认rtp auto stream_id = allArgs["stream_id"]; auto tcp_mode = allArgs["tcp_mode"].as(); if (allArgs["enable_tcp"].as() && !tcp_mode) { @@ -1219,7 +1220,8 @@ void installWebApi() { if (!allArgs["local_ip"].empty()) { local_ip = allArgs["local_ip"]; } - auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as(), + auto port = openRtpServer( + allArgs["port"], app_name, stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as(), allArgs["ssrc"].as(), only_track); if (port == 0) { throw InvalidArgsException("该stream_id已存在"); @@ -1231,6 +1233,7 @@ void installWebApi() { api_regist("/index/api/openRtpServerMultiplex", [](API_ARGS_MAP) { CHECK_SECRET(); CHECK_ARGS("port", "stream_id"); + auto app_name = allArgs["app_name"]; auto stream_id = allArgs["stream_id"]; auto tcp_mode = allArgs["tcp_mode"].as(); if (allArgs["enable_tcp"].as() && !tcp_mode) { @@ -1246,7 +1249,7 @@ void installWebApi() { if (!allArgs["local_ip"].empty()) { local_ip = allArgs["local_ip"]; } - auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, true, 0, only_track,true); + auto port = openRtpServer(allArgs["port"], app_name, stream_id, tcp_mode, local_ip, true, 0, only_track, true); if (port == 0) { throw InvalidArgsException("该stream_id已存在"); } @@ -1303,6 +1306,7 @@ void installWebApi() { Value obj; obj["stream_id"] = pr.first; obj["port"] = pr.second->getPort(); + obj["app_name"] = pr.second->getAppName(); val["data"].append(obj); } }); @@ -1411,7 +1415,7 @@ void installWebApi() { CHECK_SECRET(); CHECK_ARGS("stream_id"); //只是暂停流的检查,流媒体服务器做为流负载服务,收流就转发,RTSP/RTMP有自己暂停协议 - auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + auto rtp_process = RtpSelector::Instance().getProcess(allArgs["app"], allArgs["stream_id"], false); if (rtp_process) { rtp_process->setStopCheckRtp(true); } else { @@ -1422,7 +1426,7 @@ void installWebApi() { api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) { CHECK_SECRET(); CHECK_ARGS("stream_id"); - auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + auto rtp_process = RtpSelector::Instance().getProcess(allArgs["app"], allArgs["stream_id"], false); if (rtp_process) { rtp_process->setStopCheckRtp(false); } else { diff --git a/server/WebApi.h b/server/WebApi.h index 95562bbf..89907d28 100755 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -202,7 +202,9 @@ void installWebApi(); void unInstallWebApi(); #if defined(ENABLE_RTPPROXY) -uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex=false); +uint16_t openRtpServer(uint16_t local_port, const std::string &app_name, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex=false); +//void connectRtpServer(const std::string &stream_id, const std::string &dst_url, uint16_t dst_port, const std::function &cb); +//bool closeRtpServer(const std::string &stream_id); #endif Json::Value makeMediaSourceJson(mediakit::MediaSource &media); diff --git a/src/Common/config.cpp b/src/Common/config.cpp index a34ee84f..ca2a6bd2 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -262,6 +262,7 @@ const string kAudioMtuSize = RTP_FIELD "audioMtuSize"; const string kRtpMaxSize = RTP_FIELD "rtpMaxSize"; const string kLowLatency = RTP_FIELD "lowLatency"; const string kH264StapA = RTP_FIELD "h264_stap_a"; +const string kRtpAppName = RTP_FIELD "rtpAppName"; static onceToken token([]() { mINI::Instance()[kVideoMtuSize] = 1400; @@ -269,6 +270,7 @@ static onceToken token([]() { mINI::Instance()[kRtpMaxSize] = 10; mINI::Instance()[kLowLatency] = 0; mINI::Instance()[kH264StapA] = 1; + mINI::Instance()[kRtpAppName] = "rtp"; }); } // namespace Rtp From 353c635852ffb73d4b7f0e240b7ee4ec47075da4 Mon Sep 17 00:00:00 2001 From: baiyfcu Date: Sun, 7 Apr 2024 14:08:50 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E5=AF=B9=E5=A4=96=E6=9A=B4=E9=9C=B2zlm?= =?UTF-8?q?=E7=94=9F=E6=88=90=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 72a4544e..96f85168 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -474,6 +474,7 @@ endif() ############################################################################## # for version.h +update_cached_list(ZLM_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) # for assert.h