diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index a4b8b5e0..28b7aea1 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit a4b8b5e00aac6251254a513c7759605c0ba35f90 +Subproject commit 28b7aea107089c17c6f10e8657d27a0815f85b25 diff --git a/AUTHORS b/AUTHORS index a418921d..57928af2 100644 --- a/AUTHORS +++ b/AUTHORS @@ -82,4 +82,6 @@ WuPeng [fruit Juice](https://github.com/xuandu) [tbago](https://github.com/tbago) [Luosh](https://github.com/Luosh) -[linxiaoyan87](https://github.com/linxiaoyan) \ No newline at end of file +[linxiaoyan87](https://github.com/linxiaoyan) +[waken](https://github.com/mc373906408) +[Deepslient](https://github.com/Deepslient) \ No newline at end of file diff --git a/README.md b/README.md index 7def21a2..97fb5751 100644 --- a/README.md +++ b/README.md @@ -327,6 +327,12 @@ bash build_docker_images.sh [tbago](https://github.com/tbago) [Luosh](https://github.com/Luosh) [linxiaoyan87](https://github.com/linxiaoyan) +[waken](https://github.com/mc373906408) +[Deepslient](https://github.com/Deepslient) + +同时感谢JetBrains对开源项目的支持,本项目使用CLion开发与调试: + +[![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/CLion.svg)](https://jb.gg/OpenSourceSupport) ## 使用案例 diff --git a/README_en.md b/README_en.md index c81b6b87..5eee72d1 100644 --- a/README_en.md +++ b/README_en.md @@ -491,6 +491,12 @@ Thanks to all those who have supported this project in various ways, including b [tbago](https://github.com/tbago) [Luosh](https://github.com/Luosh) [linxiaoyan87](https://github.com/linxiaoyan) +[waken](https://github.com/mc373906408) +[Deepslient](https://github.com/Deepslient) + +Also thank to JetBrains for their support for open source project, we developed and debugged zlmediakit with CLion: + +[![JetBrains](https://resources.jetbrains.com/storage/products/company/brand/logos/CLion.svg)](https://jb.gg/OpenSourceSupport) ## Use Cases diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index 018988f4..3dc9a2c3 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -100,6 +100,9 @@ API_EXPORT int API_CALL mk_media_source_get_total_reader_count(const mk_media_so API_EXPORT int API_CALL mk_media_source_get_track_count(const mk_media_source ctx); // copy track reference by index from MediaSource, please use mk_track_unref to release it API_EXPORT mk_track API_CALL mk_media_source_get_track(const mk_media_source ctx, int index); +// MediaSource::broadcastMessage +API_EXPORT int API_CALL mk_media_source_broadcast_msg(const mk_media_source ctx, const char *msg, size_t len); + /** * 直播源在ZLMediaKit中被称作为MediaSource, * 目前支持3种,分别是RtmpMediaSource、RtspMediaSource、HlsMediaSource diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp index 1604af61..a45068ce 100644 --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -159,7 +159,7 @@ API_EXPORT void API_CALL mk_set_option(const char *key, const char *val) { } mINI::Instance()[key] = val; //广播配置文件热加载 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); + NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig); } API_EXPORT const char * API_CALL mk_get_option(const char *key) diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index fd7f696c..3833017f 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -214,6 +214,16 @@ API_EXPORT mk_track API_CALL mk_media_source_get_track(const mk_media_source ctx return (mk_track) new Track::Ptr(std::move(tracks[index])); } +API_EXPORT int API_CALL mk_media_source_broadcast_msg(const mk_media_source ctx, const char *msg, size_t len) { + assert(ctx && msg && len); + MediaSource *src = (MediaSource *)ctx; + + Any any; + Buffer::Ptr buffer = std::make_shared(std::string(msg, len)); + any.set(std::move(buffer)); + return src->broadcastMessage(any); +} + API_EXPORT int API_CALL mk_media_source_close(const mk_media_source ctx,int force){ assert(ctx); MediaSource *src = (MediaSource *)ctx; diff --git a/api/source/mk_util.cpp b/api/source/mk_util.cpp index 6c775529..5c5d3197 100644 --- a/api/source/mk_util.cpp +++ b/api/source/mk_util.cpp @@ -65,7 +65,7 @@ API_EXPORT mk_ini API_CALL mk_ini_default() { static void emit_ini_file_reload(mk_ini ini) { if (ini == mk_ini_default()) { // 广播配置文件热加载 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); + NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig); } } diff --git a/docker/centos7/Dockerfile.runtime b/docker/centos7/Dockerfile.runtime index 8f588de5..c7d9f990 100644 --- a/docker/centos7/Dockerfile.runtime +++ b/docker/centos7/Dockerfile.runtime @@ -128,4 +128,4 @@ WORKDIR /opt/zlm VOLUME [ "/opt/zlm/conf/","/opt/zlm/log/","opt/zlm/ffmpeg/"] COPY --from=build /opt/build / ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH TZ=Asia/Shanghai -CMD ./MediaServer -c ./conf/config.ini \ No newline at end of file +CMD ["./MediaServer", "-c" , "./conf/config.ini"] \ No newline at end of file diff --git a/docker/ubuntu16.04/Dockerfile.devel b/docker/ubuntu16.04/Dockerfile.devel index 476cca53..65ff0958 100644 --- a/docker/ubuntu16.04/Dockerfile.devel +++ b/docker/ubuntu16.04/Dockerfile.devel @@ -41,4 +41,4 @@ RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \ make ENV PATH /opt/media/ZLMediaKit/release/linux/Release/:$PATH -CMD MediaServer +CMD ["MediaServer"] diff --git a/docker/ubuntu16.04/Dockerfile.runtime b/docker/ubuntu16.04/Dockerfile.runtime index 4b84e017..bc869e92 100644 --- a/docker/ubuntu16.04/Dockerfile.runtime +++ b/docker/ubuntu16.04/Dockerfile.runtime @@ -60,4 +60,4 @@ RUN apt-get update && \ WORKDIR /opt/media/bin/ COPY --from=build /opt/media/ZLMediaKit/release/linux/Release/MediaServer /opt/media/bin/MediaServer ENV PATH /opt/media/bin:$PATH -CMD MediaServer +CMD ["MediaServer"] diff --git a/docker/ubuntu18.04/Dockerfile.devel b/docker/ubuntu18.04/Dockerfile.devel index 05109cee..0a61d86c 100644 --- a/docker/ubuntu18.04/Dockerfile.devel +++ b/docker/ubuntu18.04/Dockerfile.devel @@ -42,4 +42,4 @@ RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \ make ENV PATH /opt/media/ZLMediaKit/release/linux/Release:$PATH -CMD MediaServer +CMD ["MediaServer"] diff --git a/docker/ubuntu18.04/Dockerfile.runtime b/docker/ubuntu18.04/Dockerfile.runtime index ce3b6496..0a1bae65 100644 --- a/docker/ubuntu18.04/Dockerfile.runtime +++ b/docker/ubuntu18.04/Dockerfile.runtime @@ -60,4 +60,4 @@ RUN apt-get update && \ WORKDIR /opt/media/bin/ COPY --from=build /opt/media/ZLMediaKit/release/linux/Release/MediaServer /opt/media/bin/MediaServer ENV PATH /opt/media/bin:$PATH -CMD MediaServer +CMD ["MediaServer"] diff --git a/dockerfile b/dockerfile index 3297b86e..84851901 100644 --- a/dockerfile +++ b/dockerfile @@ -83,4 +83,4 @@ COPY --from=build /opt/media/ZLMediaKit/release/linux/${MODEL}/MediaServer /opt/ COPY --from=build /opt/media/ZLMediaKit/release/linux/${MODEL}/config.ini /opt/media/conf/ COPY --from=build /opt/media/ZLMediaKit/www/ /opt/media/bin/www/ ENV PATH /opt/media/bin:$PATH -CMD ["sh","-c","./MediaServer -s default.pem -c ../conf/config.ini -l 0"] +CMD ["./MediaServer","-s", "default.pem", "-c", "../conf/config.ini", "-l","0"] diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index bf0ace5f..fdda9d2a 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -912,6 +912,56 @@ }, "response": [] }, + { + "name": "广播webrtc datachannel消息(broadcastMessage)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/broadcastMessage?secret={{ZLMediaKit_secret}}&schema=rtsp&vhost={{defaultVhost}}&app=live&stream=test&msg=Hello zlmediakit123", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "broadcastMessage" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置)" + }, + { + "key": "schema", + "value": "rtsp", + "description": "协议,例如 rtsp或rtmp,目前仅支持rtsp协议" + }, + { + "key": "vhost", + "value": "{{defaultVhost}}", + "description": "虚拟主机,例如__defaultVhost__" + }, + { + "key": "app", + "value": "live", + "description": "应用名,例如 live" + }, + { + "key": "stream", + "value": "test", + "description": "流id,例如 test" + }, + { + "key": "msg", + "value": "Hello ZLMediakit" + } + ] + } + }, + "response": [] + }, { "name": "获取流信息(getMediaInfo)", "request": { diff --git a/server/Process.cpp b/server/Process.cpp index 7ddebb08..140282d9 100644 --- a/server/Process.cpp +++ b/server/Process.cpp @@ -108,7 +108,7 @@ static int cloneFunc(void *ptr) { #endif -void Process::run(const string &cmd, string &log_file) { +void Process::run(const string &cmd, string log_file) { kill(2000); #ifdef _WIN32 STARTUPINFO si = { 0 }; diff --git a/server/Process.h b/server/Process.h index d514edb5..06f55345 100644 --- a/server/Process.h +++ b/server/Process.h @@ -26,7 +26,7 @@ class Process { public: Process(); ~Process(); - void run(const std::string &cmd, std::string &log_file); + void run(const std::string &cmd, std::string log_file); void kill(int max_delay,bool force = false); bool wait(bool block = true); int exit_code(); diff --git a/server/System.cpp b/server/System.cpp index aae4c792..bc93ed3c 100644 --- a/server/System.cpp +++ b/server/System.cpp @@ -126,6 +126,12 @@ void System::startDaemon(bool &kill_parent_if_failed) { exit(0); }); + signal(SIGTERM,[](int) { + WarnL << "收到主动退出信号,关闭父进程与子进程"; + kill(pid, SIGINT); + exit(0); + }); + do { int status = 0; if (waitpid(pid, &status, 0) >= 0) { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 6f40403d..e8081756 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -666,7 +666,7 @@ void installWebApi() { ++changed; } if (changed > 0) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); + NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig); ini.dumpFile(g_ini_file); } val["changed"] = changed; @@ -795,25 +795,40 @@ void installWebApi() { throw ApiRetException("can not find the stream", API::NotFound); } src->getPlayerList( - [=](const std::list> &info_list) mutable { + [=](const std::list &info_list) mutable { val["code"] = API::Success; auto &data = val["data"]; data = Value(arrayValue); for (auto &info : info_list) { - auto obj = static_pointer_cast(info); - data.append(std::move(*obj)); + auto &obj = info.get(); + data.append(std::move(obj)); } invoker(200, headerOut, val.toStyledString()); }, - [](std::shared_ptr &&info) -> std::shared_ptr { + [](toolkit::Any &&info) -> toolkit::Any { auto obj = std::make_shared(); - auto session = static_pointer_cast(info); - fillSockInfo(*obj, session.get()); - (*obj)["typeid"] = toolkit::demangle(typeid(*session).name()); - return obj; + auto &sock = info.get(); + fillSockInfo(*obj, &sock); + (*obj)["typeid"] = toolkit::demangle(typeid(sock).name()); + toolkit::Any ret; + ret.set(obj); + return ret; }); }); + api_regist("/index/api/broadcastMessage", [](API_ARGS_MAP) { + CHECK_SECRET(); + CHECK_ARGS("schema", "vhost", "app", "stream", "msg"); + auto src = MediaSource::find(allArgs["schema"], allArgs["vhost"], allArgs["app"], allArgs["stream"]); + if (!src) { + throw ApiRetException("can not find the stream", API::NotFound); + } + Any any; + Buffer::Ptr buffer = std::make_shared(allArgs["msg"]); + any.set(std::move(buffer)); + src->broadcastMessage(any); + }); + //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); diff --git a/server/main.cpp b/server/main.cpp index f0d0ce1b..70b6348f 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -420,6 +420,12 @@ int start_main(int argc,char *argv[]) { sem.post(); }); // 设置退出信号 + signal(SIGTERM,[](int) { + WarnL << "SIGTERM:exit"; + signal(SIGTERM, SIG_IGN); + sem.post(); + }); + #if !defined(_WIN32) signal(SIGHUP, [](int) { mediakit::loadIniConfig(g_ini_file.data()); }); #endif diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index c0757bf3..1977b616 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -469,7 +469,7 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr &s }); }; //广播未找到流,此时可以立即去拉流,这样还来得及 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastNotFoundStream, info, static_cast(*session), close_player); + NOTICE_EMIT(BroadcastNotFoundStreamArgs, Broadcast::kBroadcastNotFoundStream, info, *session, close_player); } void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr &session, const function &cb) { @@ -499,7 +499,7 @@ void MediaSource::emitEvent(bool regist){ listener->onRegist(*this, regist); } //触发广播 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, regist, *this); + NOTICE_EMIT(BroadcastMediaChangedArgs, Broadcast::kBroadcastMediaChanged, regist, *this); InfoL << (regist ? "媒体注册:" : "媒体注销:") << getUrl(); } @@ -669,7 +669,7 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ strong_sender->close(false); } else { // 直播时触发无人观看事件,让开发者自行选择是否关闭 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender); + NOTICE_EMIT(BroadcastStreamNoneReaderArgs, Broadcast::kBroadcastStreamNoneReader, *strong_sender); } } else { //这个是mp4点播,我们自动关闭 diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 1329b196..bcf75a09 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -347,12 +347,14 @@ public: // 观看者个数,包括(hls/rtsp/rtmp) virtual int totalReaderCount(); // 获取播放器列表 - virtual void getPlayerList(const std::function> &info_list)> &cb, - const std::function(std::shared_ptr &&info)> &on_change) { + virtual void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) { assert(cb); - cb(std::list>()); + cb(std::list()); } + virtual bool broadcastMessage(const toolkit::Any &data) { return false; } + // 获取媒体源类型 MediaOriginType getOriginType() const; // 获取媒体源url或者文件路径 diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 0446abf6..bbb9b357 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -312,7 +312,7 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() { WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex; strong_self->_rtp_sender.erase(ssrc); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); + NOTICE_EMIT(BroadcastSendRtpStoppedArgs, Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex); }); } }); diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 87e4e855..b113a968 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -31,7 +31,7 @@ bool loadIniConfig(const char *ini_path) { } try { mINI::Instance().parseFile(ini); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); + NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig); return true; } catch (std::exception &) { InfoL << "dump ini file to:" << ini; diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h index 53359e30..13674ebe 100644 --- a/src/FMP4/FMP4MediaSource.h +++ b/src/FMP4/FMP4MediaSource.h @@ -51,8 +51,8 @@ public: return _ring; } - void getPlayerList(const std::function> &info_list)> &cb, - const std::function(std::shared_ptr &&info)> &on_change) override { + void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) override { _ring->getInfoList(cb, on_change); } diff --git a/src/Http/HttpFileManager.cpp b/src/Http/HttpFileManager.cpp index 66e5f7fa..5577ffbb 100644 --- a/src/Http/HttpFileManager.cpp +++ b/src/Http/HttpFileManager.cpp @@ -250,7 +250,7 @@ static bool emitHlsPlayed(const Parser &parser, const MediaInfo &media_info, con //cookie有效期为kHlsCookieSecond invoker(err, "", kHlsCookieSecond); }; - bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, media_info, auth_invoker, static_cast(sender)); + bool flag = NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, media_info, auth_invoker, sender); if (!flag) { //未开启鉴权,那么允许播放 auth_invoker(""); @@ -383,7 +383,7 @@ static void canAccessPath(Session &sender, const Parser &parser, const MediaInfo } // 事件未被拦截,则认为是http下载请求 - bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, static_cast(sender)); + bool flag = NOTICE_EMIT(BroadcastHttpAccessArgs, Broadcast::kBroadcastHttpAccess, parser, path, is_dir, accessPathInvoker, sender); if (!flag) { // 此事件无人监听,我们默认都有权限访问 callback("", nullptr); @@ -556,7 +556,7 @@ static string getFilePath(const Parser &parser,const MediaInfo &media_info, Sess } // 替换url,防止返回的目录索引网页被注入非法内容 const_cast(parser).setUrl("/" + ret.substr(http_root.size())); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpBeforeAccess, parser, ret, static_cast(sender)); + NOTICE_EMIT(BroadcastHttpBeforeAccessArgs, Broadcast::kBroadcastHttpBeforeAccess, parser, ret, sender); return ret; } diff --git a/src/Http/HttpRequester.cpp b/src/Http/HttpRequester.cpp index 3e403df2..8bd09950 100644 --- a/src/Http/HttpRequester.cpp +++ b/src/Http/HttpRequester.cpp @@ -271,7 +271,7 @@ static void sendReport() { } static toolkit::onceToken s_token([]() { - NoticeCenter::Instance().addListener(nullptr, "kBroadcastEventPollerPoolStarted", [](EventPollerPool &pool, size_t &size) { + NoticeCenter::Instance().addListener(nullptr, "kBroadcastEventPollerPoolStarted", [](EventPollerPoolOnStartedArgs) { // 第一次汇报在程序启动后5分钟 pool.getPoller()->doDelayTask(5 * 60 * 1000, []() { sendReport(); diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 145a08a2..69407928 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -170,7 +170,7 @@ void HttpSession::onError(const SockException &err) { GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration, true, static_cast(*this)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration, true, *this); } return; } @@ -311,7 +311,7 @@ bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix } }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, _mediaInfo, invoker, *this); if (!flag) { // 该事件无人监听,默认不鉴权 onRes(""); @@ -338,7 +338,11 @@ bool HttpSession::checkLiveStreamFMP4(const function &cb) { weak_ptr weak_self = static_pointer_cast(shared_from_this()); fmp4_src->pause(false); _fmp4_reader = fmp4_src->getRing()->attach(getPoller()); - _fmp4_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _fmp4_reader->setGetInfoCB([weak_self]() { + Any ret; + ret.set(static_pointer_cast(weak_self.lock())); + return ret; + }); _fmp4_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -378,7 +382,11 @@ bool HttpSession::checkLiveStreamTS(const function &cb) { weak_ptr weak_self = static_pointer_cast(shared_from_this()); ts_src->pause(false); _ts_reader = ts_src->getRing()->attach(getPoller()); - _ts_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _ts_reader->setGetInfoCB([weak_self]() { + Any ret; + ret.set(static_pointer_cast(weak_self.lock())); + return ret; + }); _ts_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -711,7 +719,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke) { }; ///////////////////广播HTTP事件/////////////////////////// bool consumed = false; // 该事件是否被消费 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest, _parser, invoker, consumed, static_cast(*this)); + NOTICE_EMIT(BroadcastHttpRequestArgs, Broadcast::kBroadcastHttpRequest, _parser, invoker, consumed, *this); if (!consumed && doInvoke) { // 该事件无人消费,所以返回404 invoker(404, KeyValue(), HttpBody::Ptr()); diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index bbc49df5..2ba99f76 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -149,7 +149,7 @@ void HlsMakerImp::onFlushLastSegment(uint64_t duration_ms) { if (broadcastRecordTs) { _info.time_len = duration_ms / 1000.0f; _info.file_size = File::fileSize(_info.file_path.data()); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordTs, _info); + NOTICE_EMIT(BroadcastRecordTsArgs, Broadcast::kBroadcastRecordTs, _info); } } diff --git a/src/Record/HlsMediaSource.cpp b/src/Record/HlsMediaSource.cpp index f3fe516b..f3adb775 100644 --- a/src/Record/HlsMediaSource.cpp +++ b/src/Record/HlsMediaSource.cpp @@ -33,6 +33,12 @@ void HlsCookieData::addReaderCount() { // HlsMediaSource已经销毁 *added = false; }); + auto info = _sock_info; + _ring_reader->setGetInfoCB([info]() { + Any ret; + ret.set(info); + return ret; + }); } } } @@ -47,7 +53,7 @@ HlsCookieData::~HlsCookieData() { uint64_t bytes = _bytes.load(); if (bytes >= iFlowThreshold * 1024) { try { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _info, bytes, duration, true, static_cast(*_sock_info)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _info, bytes, duration, true, *_sock_info); } catch (std::exception &ex) { WarnL << "Exception occurred: " << ex.what(); } diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index 94cfdf4d..1e0ecd14 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -58,6 +58,11 @@ public: void onSegmentSize(size_t bytes) { _speed[TrackVideo] += bytes; } + void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) override { + _ring->getInfoList(cb, on_change); + } + private: RingType::Ptr _ring; std::string _index_file; diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index ade1d519..5373b8a1 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -92,7 +92,7 @@ void MP4Recorder::asyncClose() { } TraceL << "Emit mp4 record event: " << full_path; //触发mp4录制切片生成事件 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordMP4, info); + NOTICE_EMIT(BroadcastRecordMP4Args, Broadcast::kBroadcastRecordMP4, info); }); } diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index bee86ddb..b2d1a6ad 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -46,7 +46,11 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr std::weak_ptr weak_self = getSharedPtr(); media->pause(false); _ring_reader = media->getRing()->attach(poller); - _ring_reader->setGetInfoCB([weak_self]() { return dynamic_pointer_cast(weak_self.lock()); }); + _ring_reader->setGetInfoCB([weak_self]() { + Any ret; + ret.set(dynamic_pointer_cast(weak_self.lock())); + return ret; + }); _ring_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/src/Rtmp/FlvPlayer.cpp b/src/Rtmp/FlvPlayer.cpp index fbefd23e..0095eb04 100644 --- a/src/Rtmp/FlvPlayer.cpp +++ b/src/Rtmp/FlvPlayer.cpp @@ -54,7 +54,10 @@ void FlvPlayer::onResponseCompleted(const SockException &ex) { } void FlvPlayer::onResponseBody(const char *buf, size_t size) { - FlvSplitter::input(buf, size); + if (!_benchmark_mode) { + // 性能测试模式不做数据解析,节省cpu + FlvSplitter::input(buf, size); + } } bool FlvPlayer::onRecvMetadata(const AMFValue &metadata) { @@ -64,6 +67,7 @@ bool FlvPlayer::onRecvMetadata(const AMFValue &metadata) { void FlvPlayer::onRecvRtmpPacket(RtmpPacket::Ptr packet) { if (!_play_result && !packet->isConfigFrame()) { _play_result = true; + _benchmark_mode = (*this)[Client::kBenchmarkMode].as(); onPlayResult(SockException(Err_success, "play http-flv success")); } onRtmpPacket(std::move(packet)); diff --git a/src/Rtmp/FlvPlayer.h b/src/Rtmp/FlvPlayer.h index c49b2b3f..2e79dac2 100644 --- a/src/Rtmp/FlvPlayer.h +++ b/src/Rtmp/FlvPlayer.h @@ -40,6 +40,7 @@ private: private: bool _play_result = false; + bool _benchmark_mode = false; }; using FlvPlayerImp = FlvPlayerBase; diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 718f6055..732cbb85 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -57,8 +57,8 @@ public: return _ring; } - void getPlayerList(const std::function> &info_list)> &cb, - const std::function(std::shared_ptr &&info)> &on_change) override { + void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) override { _ring->getInfoList(cb, on_change); } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 8a5d8c79..fa3058be 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -36,7 +36,7 @@ void RtmpSession::onError(const SockException& err) { GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast(*this)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, *this); } //如果是主动关闭的,那么不延迟注销 @@ -215,7 +215,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { on_res(err, option); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtmp_push, _media_info, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPublishArgs, Broadcast::kBroadcastMediaPublish, MediaOriginType::rtmp_push, _media_info, invoker, *this); if(!flag){ //该事件无人监听,默认鉴权成功 on_res("", ProtocolOption()); @@ -306,7 +306,11 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr src->pause(false); _ring_reader = src->getRing()->attach(getPoller()); weak_ptr weak_self = static_pointer_cast(shared_from_this()); - _ring_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _ring_reader->setGetInfoCB([weak_self]() { + Any ret; + ret.set(static_pointer_cast(weak_self.lock())); + return ret; + }); _ring_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -381,7 +385,7 @@ void RtmpSession::doPlay(AMFDecoder &dec){ }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, _media_info, invoker, *this); if (!flag) { // 该事件无人监听,默认不鉴权 doPlayResponse("", [token](bool) {}); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 0ca3f704..5e61ccf4 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -67,7 +67,7 @@ RtpProcess::~RtpProcess() { GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes >= iFlowThreshold * 1024) { try { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, static_cast(*this)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, false, *this); } catch (std::exception &ex) { WarnL << "Exception occurred: " << ex.what(); } @@ -266,9 +266,9 @@ void RtpProcess::emitOnPublish() { }; //触发推流鉴权事件 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtp_push, _media_info, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPublishArgs, Broadcast::kBroadcastMediaPublish, MediaOriginType::rtp_push, _media_info, invoker, *this); if (!flag) { - //该事件无人监听,默认不鉴权 + // 该事件无人监听,默认不鉴权 invoker("", ProtocolOption()); } } diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 59e207ea..78aee8e1 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -102,8 +102,8 @@ public: process->setOnDetach(std::move(strong_self->_on_detach)); } if (!process) { // process 未创建,触发rtp server 超时事件 - NoticeCenter::Instance().emitEvent(Broadcast::KBroadcastRtpServerTimeout, strong_self->_local_port, strong_self->_stream_id, - (int)strong_self->_tcp_mode, strong_self->_re_use_port, strong_self->_ssrc); + NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::KBroadcastRtpServerTimeout, strong_self->_local_port, strong_self->_stream_id, + (int)strong_self->_tcp_mode, strong_self->_re_use_port, strong_self->_ssrc); } } return 0; diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index 579a881e..daf09680 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -467,7 +467,7 @@ bool isRtp(const char *buf, size_t size) { return false; } RtpHeader *header = (RtpHeader *)buf; - return ((header->pt < 64) || (header->pt >= 96)); + return ((header->pt < 64) || (header->pt >= 96)) && header->version == RtpPacket::kRtpVersion; } bool isRtcp(const char *buf, size_t size) { diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index f70396d8..bac7fcf9 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -53,11 +53,18 @@ public: return _ring; } - void getPlayerList(const std::function> &info_list)> &cb, - const std::function(std::shared_ptr &&info)> &on_change) override { + void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) override { + assert(_ring); _ring->getInfoList(cb, on_change); } + bool broadcastMessage(const toolkit::Any &data) override { + assert(_ring); + _ring->sendMessage(data); + return true; + } + /** * 获取播放器个数 */ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index a660e6be..d780a66d 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -80,7 +80,7 @@ void RtspSession::onError(const SockException &err) { //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, static_cast(*this)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, is_player, *this); } //如果是主动关闭的,那么不延迟注销 @@ -294,7 +294,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { }; //rtsp推流需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtsp_push, _media_info, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPublishArgs, Broadcast::kBroadcastMediaPublish, MediaOriginType::rtsp_push, _media_info, invoker, *this); if (!flag) { //该事件无人监听,默认不鉴权 onRes("", ProtocolOption()); @@ -352,7 +352,7 @@ void RtspSession::emitOnPlay(){ }; //广播通用播放url鉴权事件 - auto flag = _emit_on_play ? false : NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); + auto flag = _emit_on_play ? false : NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, _media_info, invoker, *this); if (!flag) { //该事件无人监听,默认不鉴权 onRes(""); @@ -392,7 +392,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) { if(_rtsp_realm.empty()){ //广播是否需要rtsp专属认证事件 - if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, static_cast(*this))) { + if (!NOTICE_EMIT(BroadcastOnGetRtspRealmArgs, Broadcast::kBroadcastOnGetRtspRealm, _media_info, invoker, *this)) { //无人监听此事件,说明无需认证 invoker(""); } @@ -497,7 +497,7 @@ void RtspSession::onAuthBasic(const string &realm, const string &auth_base64) { }; //此时必须提供明文密码 - if (!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, static_cast(*this))) { + if (!NOTICE_EMIT(BroadcastOnRtspAuthArgs, Broadcast::kBroadcastOnRtspAuth, _media_info, realm, user, true, invoker, *this)) { //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -581,7 +581,7 @@ void RtspSession::onAuthDigest(const string &realm,const string &auth_md5){ }; //此时可以提供明文或md5加密的密码 - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, static_cast(*this))){ + if(!NOTICE_EMIT(BroadcastOnRtspAuthArgs, Broadcast::kBroadcastOnRtspAuth, _media_info, realm, username, false, invoker, *this)){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnP(this) << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -857,7 +857,11 @@ void RtspSession::handleReq_Play(const Parser &parser) { if (!_play_reader && _rtp_type != Rtsp::RTP_MULTICAST) { weak_ptr weak_self = static_pointer_cast(shared_from_this()); _play_reader = play_src->getRing()->attach(getPoller(), use_gop); - _play_reader->setGetInfoCB([weak_self]() { return weak_self.lock(); }); + _play_reader->setGetInfoCB([weak_self]() { + Any ret; + ret.set(static_pointer_cast(weak_self.lock())); + return ret; + }); _play_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/src/Shell/ShellSession.cpp b/src/Shell/ShellSession.cpp index fe0c23fd..85c682ee 100644 --- a/src/Shell/ShellSession.cpp +++ b/src/Shell/ShellSession.cpp @@ -135,9 +135,9 @@ inline void ShellSession::pleaseInputPasswd() { }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastShellLogin,_strUserName,passwd,invoker,static_cast(*this)); - if(!flag){ - //如果无人监听shell登录事件,那么默认shell无法登录 + auto flag = NOTICE_EMIT(BroadcastShellLoginArgs, Broadcast::kBroadcastShellLogin, _strUserName, passwd, invoker, *this); + if (!flag) { + // 如果无人监听shell登录事件,那么默认shell无法登录 onAuth("please listen kBroadcastShellLogin event"); } return true; diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h index 71797c1b..b691d15f 100644 --- a/src/TS/TSMediaSource.h +++ b/src/TS/TSMediaSource.h @@ -50,8 +50,8 @@ public: return _ring; } - void getPlayerList(const std::function> &info_list)> &cb, - const std::function(std::shared_ptr &&info)> &on_change) override { + void getPlayerList(const std::function &info_list)> &cb, + const std::function &on_change) override { _ring->getInfoList(cb, on_change); } diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index c3c18c5d..56d6bd8a 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -17,7 +17,7 @@ SrtTransportImp::~SrtTransportImp() { GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_total_bytes >= iFlowThreshold * 1024) { try { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, !_is_pusher, static_cast(*this)); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, !_is_pusher, *this); } catch (std::exception &ex) { WarnL << "Exception occurred: " << ex.what(); } @@ -172,9 +172,7 @@ void SrtTransportImp::emitOnPublish() { }; // 触发推流鉴权事件 - auto flag = NoticeCenter::Instance().emitEvent( - Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, - static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPublishArgs, Broadcast::kBroadcastMediaPublish, MediaOriginType::srt_push, _media_info, invoker, *this); if (!flag) { // 该事件无人监听,默认不鉴权 invoker("", ProtocolOption()); @@ -197,8 +195,7 @@ void SrtTransportImp::emitOnPlay() { }); }; - auto flag = NoticeCenter::Instance().emitEvent( - Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); + auto flag = NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, _media_info, invoker, *this); if (!flag) { doPlay(); } @@ -227,7 +224,11 @@ void SrtTransportImp::doPlay() { ts_src->pause(false); strong_self->_ts_reader = ts_src->getRing()->attach(strong_self->getPoller()); weak_ptr weak_session = strong_self->getSession(); - strong_self->_ts_reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); + strong_self->_ts_reader->setGetInfoCB([weak_session]() { + Any ret; + ret.set(static_pointer_cast(weak_session.lock())); + return ret; + }); strong_self->_ts_reader->setDetachCB([weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index a766607f..63f174eb 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -48,7 +48,11 @@ void WebRtcPlayer::onStartWebRTC() { _reader = playSrc->getRing()->attach(getPoller(), true); weak_ptr weak_self = static_pointer_cast(shared_from_this()); weak_ptr weak_session = static_pointer_cast(getSession()); - _reader->setGetInfoCB([weak_session]() { return weak_session.lock(); }); + _reader->setGetInfoCB([weak_session]() { + Any ret; + ret.set(static_pointer_cast(weak_session.lock())); + return ret; + }); _reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -67,6 +71,21 @@ void WebRtcPlayer::onStartWebRTC() { } strong_self->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached")); }); + + _reader->setMessageCB([weak_self] (const toolkit::Any &data) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + if (data.is()) { + auto &buffer = data.get(); + // PPID 51: 文本string + // PPID 53: 二进制 + strong_self->sendDatachannel(0, 51, buffer.data(), buffer.size()); + } else { + WarnL << "Send unknown message type to webrtc player: " << data.type_name(); + } + }); } } void WebRtcPlayer::onDestory() { @@ -77,7 +96,7 @@ void WebRtcPlayer::onDestory() { if (_reader && getSession()) { WarnL << "RTC播放器(" << _media_info.shortUrl() << ")结束播放,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, true, static_cast(*getSession())); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, true, *getSession()); } } WebRtcTransportImp::onDestory(); diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 0a62c0f3..92b4ad04 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -129,7 +129,7 @@ void WebRtcPusher::onDestory() { if (getSession()) { WarnL << "RTC推流器(" << _media_info.shortUrl() << ")结束推流,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, false, static_cast(*getSession())); + NOTICE_EMIT(BroadcastFlowReportArgs, Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, false, *getSession()); } } diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 1200c148..4ec13e6f 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -229,6 +229,19 @@ void WebRtcTransport::OnSctpAssociationMessageReceived( _sctp->SendSctpMessage(params, ppid, msg, len); } #endif + +void WebRtcTransport::sendDatachannel(uint16_t streamId, uint32_t ppid, const char *msg, size_t len) { +#ifdef ENABLE_SCTP + if (_sctp) { + RTC::SctpStreamParameters params; + params.streamId = streamId; + _sctp->SendSctpMessage(params, ppid, (uint8_t *)msg, len); + } +#else + WarnL << "WebRTC datachannel disabled!"; +#endif +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTuple *tuple) { @@ -1218,7 +1231,7 @@ void push_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana }; // rtsp推流需要鉴权 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, MediaOriginType::rtc_push, info, invoker, static_cast(sender)); + auto flag = NOTICE_EMIT(BroadcastMediaPublishArgs, Broadcast::kBroadcastMediaPublish, MediaOriginType::rtc_push, info, invoker, sender); if (!flag) { // 该事件无人监听,默认不鉴权 invoker("", ProtocolOption()); @@ -1252,7 +1265,7 @@ void play_plugin(Session &sender, const WebRtcArgs &args, const WebRtcPluginMana }; // 广播通用播放url鉴权事件 - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, info, invoker, static_cast(sender)); + auto flag = NOTICE_EMIT(BroadcastMediaPlayedArgs, Broadcast::kBroadcastMediaPlayed, info, invoker, sender); if (!flag) { // 该事件无人监听,默认不鉴权 invoker(""); diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 2534ca3b..2d1e6bbf 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -112,6 +112,7 @@ public: */ void sendRtpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); + void sendDatachannel(uint16_t streamId, uint32_t ppid, const char *msg, size_t len); const EventPoller::Ptr& getPoller() const; Session::Ptr getSession() const;