From 9e26a02fb1a1b30594d74b30edf19285344bbedb Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 12 Sep 2020 19:09:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0rtsp/rtmp=E6=8C=89=E9=9C=80?= =?UTF-8?q?=E8=BD=AC=E5=8D=8F=E8=AE=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/include/mk_events_objects.h | 2 - api/source/mk_events.cpp | 9 ++-- api/source/mk_events_objects.cpp | 3 +- api/tests/server.c | 4 +- conf/config.ini | 2 - server/WebHook.cpp | 22 ++++------ src/Common/MediaSource.cpp | 63 ++++++++++++++-------------- src/Common/MediaSource.h | 14 ++++--- src/Common/MultiMediaSourceMuxer.cpp | 8 +--- src/Common/MultiMediaSourceMuxer.h | 2 - src/Common/config.cpp | 2 - src/Common/config.h | 5 +-- src/Record/HlsMediaSource.h | 16 ++++--- src/Rtmp/RtmpMediaSource.h | 18 ++++---- src/Rtmp/RtmpMediaSourceImp.h | 5 +-- src/Rtmp/RtmpMediaSourceMuxer.h | 34 +++++++++++++-- src/Rtmp/RtmpSession.cpp | 19 ++++----- src/Rtp/RtpProcess.cpp | 7 ++-- src/Rtsp/RtspMediaSource.h | 18 ++++---- src/Rtsp/RtspMediaSourceImp.h | 5 +-- src/Rtsp/RtspMediaSourceMuxer.h | 46 ++++++++++++++++---- src/Rtsp/RtspSession.cpp | 17 ++++---- tests/test_server.cpp | 2 +- 23 files changed, 170 insertions(+), 153 deletions(-) diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index a6669fed..bdcca23c 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -276,13 +276,11 @@ typedef void* mk_publish_auth_invoker; /** * 执行Broadcast::PublishAuthInvoker * @param err_msg 为空或null则代表鉴权成功 - * @param enable_rtxp rtmp推流时是否运行转rtsp;rtsp推流时,是否允许转rtmp * @param enable_hls 是否允许转换hls * @param enable_mp4 是否运行MP4录制 */ API_EXPORT void API_CALL mk_publish_auth_invoker_do(const mk_publish_auth_invoker ctx, const char *err_msg, - int enable_rtxp, int enable_hls, int enable_mp4); diff --git a/api/source/mk_events.cpp b/api/source/mk_events.cpp index 12ee4a46..ace60eb0 100644 --- a/api/source/mk_events.cpp +++ b/api/source/mk_events.cpp @@ -101,11 +101,10 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ s_events.on_mk_media_publish((mk_media_info) &args, (mk_publish_auth_invoker) &invoker, (mk_sock_info) &sender); - }else{ - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); - GET_CONFIG(bool,toHls,General::kPublishToHls); - GET_CONFIG(bool,toMP4,General::kPublishToMP4); - invoker("",toRtxp,toHls,toMP4); + } else { + GET_CONFIG(bool, toHls, General::kPublishToHls); + GET_CONFIG(bool, toMP4, General::kPublishToMP4); + invoker("", toHls, toMP4); } }); diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 98a233d8..72c30ed2 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -382,12 +382,11 @@ API_EXPORT void API_CALL mk_rtsp_auth_invoker_clone_release(const mk_rtsp_auth_i ///////////////////////////////////////////Broadcast::PublishAuthInvoker///////////////////////////////////////////// API_EXPORT void API_CALL mk_publish_auth_invoker_do(const mk_publish_auth_invoker ctx, const char *err_msg, - int enable_rtxp, int enable_hls, int enable_mp4){ assert(ctx); Broadcast::PublishAuthInvoker *invoker = (Broadcast::PublishAuthInvoker *)ctx; - (*invoker)(err_msg ? err_msg : "", enable_rtxp, enable_hls, enable_mp4); + (*invoker)(err_msg ? err_msg : "", enable_hls, enable_mp4); } API_EXPORT mk_publish_auth_invoker API_CALL mk_publish_auth_invoker_clone(const mk_publish_auth_invoker ctx){ diff --git a/api/tests/server.c b/api/tests/server.c index 07eaf12a..40ac9954 100644 --- a/api/tests/server.c +++ b/api/tests/server.c @@ -61,8 +61,8 @@ void API_CALL on_mk_media_publish(const mk_media_info url_info, mk_media_info_get_stream(url_info), mk_media_info_get_params(url_info)); - //允许推流,并且允许转rtxp/hls/mp4 - mk_publish_auth_invoker_do(invoker, NULL, 1, 1, 1); + //允许推流,并且允许转hls/mp4 + mk_publish_auth_invoker_do(invoker, NULL, 1, 1); } /** diff --git a/conf/config.ini b/conf/config.ini index 5964342f..72ca1b43 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -40,8 +40,6 @@ addMuteAudio=1 #拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, #如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) resetWhenRePlay=1 -#是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置 -publishToRtxp=1 #是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置 publishToHls=1 #是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 diff --git a/server/WebHook.cpp b/server/WebHook.cpp index c757bed9..05943403 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -195,11 +195,10 @@ void installWebHook(){ GET_CONFIG(string,hook_http_access,Hook::kOnHttpAccess); NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); GET_CONFIG(bool,toHls,General::kPublishToHls); GET_CONFIG(bool,toMP4,General::kPublishToMP4); if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){ - invoker("",toRtxp,toHls,toMP4); + invoker("", toHls, toMP4); return; } //异步执行该hook api,防止阻塞NoticeCenter @@ -211,27 +210,20 @@ void installWebHook(){ do_http_hook(hook_publish,body,[invoker](const Value &obj,const string &err){ if(err.empty()){ //推流鉴权成功 - bool enableRtxp = toRtxp; bool enableHls = toHls; bool enableMP4 = toMP4; - //兼容用户不传递enableRtxp、enableHls、enableMP4参数 - if(obj.isMember("enableRtxp")){ - enableRtxp = obj["enableRtxp"].asBool(); - } - - if(obj.isMember("enableHls")){ + //兼容用户不传递enableHls、enableMP4参数 + if (obj.isMember("enableHls")) { enableHls = obj["enableHls"].asBool(); } - - if(obj.isMember("enableMP4")){ + if (obj.isMember("enableMP4")) { enableMP4 = obj["enableMP4"].asBool(); } - - invoker(err,enableRtxp,enableHls,enableMP4); - }else{ + invoker(err, enableHls, enableMP4); + } else { //推流鉴权失败 - invoker(err,false, false, false); + invoker(err, false, false); } }); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 758f8ff3..05da87d0 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -93,13 +93,10 @@ bool MediaSource::close(bool force) { return listener->close(*this,force); } -void MediaSource::onNoneReader(){ +void MediaSource::onReaderChanged(int size) { auto listener = _listener.lock(); - if(!listener){ - return; - } - if (listener->totalReaderCount(*this) == 0) { - listener->onNoneReader(*this); + if (listener) { + listener->onReaderChanged(*this, size); } } @@ -475,45 +472,47 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string & /////////////////////////////////////MediaSourceEvent////////////////////////////////////// -void MediaSourceEvent::onNoneReader(MediaSource &sender){ +void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ + if (size || totalReaderCount(sender)) { + //还有人观看该视频,不触发关闭事件 + return; + } + //没有任何人观看该视频源,表明该源可以关闭了 GET_CONFIG(string, record_app, Record::kAppName); GET_CONFIG(int, stream_none_reader_delay, General::kStreamNoneReaderDelayMS); - //如果mp4点播, 无人观看时我们强制关闭点播 bool is_mp4_vod = sender.getApp() == record_app; + weak_ptr weak_sender = sender.shared_from_this(); - //没有任何人观看该视频源,表明该源可以关闭了 - weak_ptr weakSender = sender.shared_from_this(); - _async_close_timer = std::make_shared(stream_none_reader_delay / 1000.0, [weakSender,is_mp4_vod]() { - auto strongSender = weakSender.lock(); - if (!strongSender) { + _async_close_timer = std::make_shared(stream_none_reader_delay / 1000.0, [weak_sender, is_mp4_vod]() { + auto strong_sender = weak_sender.lock(); + if (!strong_sender) { //对象已经销毁 return false; } - if (strongSender->totalReaderCount() != 0) { - //还有人消费 + if (strong_sender->totalReaderCount()) { + //还有人观看该视频,不触发关闭事件 return false; } - if(!is_mp4_vod){ + if (!is_mp4_vod) { //直播时触发无人观看事件,让开发者自行选择是否关闭 WarnL << "无人观看事件:" - << strongSender->getSchema() << "/" - << strongSender->getVhost() << "/" - << strongSender->getApp() << "/" - << strongSender->getId(); - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strongSender); - }else{ + << strong_sender->getSchema() << "/" + << strong_sender->getVhost() << "/" + << strong_sender->getApp() << "/" + << strong_sender->getId(); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender); + } else { //这个是mp4点播,我们自动关闭 WarnL << "MP4点播无人观看,自动关闭:" - << strongSender->getSchema() << "/" - << strongSender->getVhost() << "/" - << strongSender->getApp() << "/" - << strongSender->getId(); - strongSender->close(false); + << strong_sender->getSchema() << "/" + << strong_sender->getVhost() << "/" + << strong_sender->getApp() << "/" + << strong_sender->getId(); + strong_sender->close(false); } - return false; }, nullptr); } @@ -542,13 +541,13 @@ int MediaSourceEventInterceptor::totalReaderCount(MediaSource &sender) { return listener->totalReaderCount(sender); } -void MediaSourceEventInterceptor::onNoneReader(MediaSource &sender) { +void MediaSourceEventInterceptor::onReaderChanged(MediaSource &sender, int size) { auto listener = _listener.lock(); if (!listener) { - MediaSourceEvent::onNoneReader(sender); - return; + MediaSourceEvent::onReaderChanged(sender, size); + } else { + listener->onReaderChanged(sender, size); } - listener->onNoneReader(sender); } void MediaSourceEventInterceptor::onRegist(MediaSource &sender, bool regist) { diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 98a5ed1a..f92df7c2 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -49,8 +49,8 @@ public: virtual bool close(MediaSource &sender, bool force) { return false; } // 获取观看总人数 virtual int totalReaderCount(MediaSource &sender) = 0; - // 通知无人观看 - virtual void onNoneReader(MediaSource &sender); + // 通知观看人数变化 + virtual void onReaderChanged(MediaSource &sender, int size); //流注册或注销事件 virtual void onRegist(MediaSource &sender, bool regist) {}; @@ -79,7 +79,7 @@ public: bool seekTo(MediaSource &sender, uint32_t stamp) override; bool close(MediaSource &sender, bool force) override; int totalReaderCount(MediaSource &sender) override; - void onNoneReader(MediaSource &sender) override; + void onReaderChanged(MediaSource &sender, int size) override; void onRegist(MediaSource &sender, bool regist) override; bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; @@ -160,8 +160,8 @@ public: bool seekTo(uint32_t stamp); // 关闭该流 bool close(bool force); - // 该流无人观看 - void onNoneReader(); + // 该流观看人数变化 + void onReaderChanged(int size); // 开启或关闭录制 bool setupRecord(Recorder::type type, bool start, const string &custom_path); // 获取录制状态 @@ -249,6 +249,10 @@ public: } } + virtual void clearCache() { + _cache->clear(); + } + virtual void onFlush(std::shared_ptr &, bool key_pos) = 0; private: diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index b7b4942e..771aa41c 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -18,21 +18,17 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) { if (enable_rtmp) { _rtmp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); - _enable_rtxp = true; } if (enable_rtsp) { _rtsp = std::make_shared(vhost, app, stream, std::make_shared(dur_sec)); - _enable_rtxp = true; } if (enable_hls) { _hls = Recorder::createRecorder(Recorder::type_hls, vhost, app, stream); - _enable_record = true; } if (enable_mp4) { _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); - _enable_record = true; } } @@ -101,7 +97,6 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo //停止录制 _hls = nullptr; } - _enable_record = _hls || _mp4; return true; } case Recorder::type_mp4 : { @@ -112,7 +107,6 @@ bool MultiMuxerPrivate::setupRecord(MediaSource &sender, Recorder::type type, bo //停止录制 _mp4 = nullptr; } - _enable_record = _hls || _mp4; return true; } default : return false; @@ -164,7 +158,7 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { } bool MultiMuxerPrivate::isEnabled(){ - return _enable_rtxp || _enable_record; + return (_rtmp ? _rtmp->isEnabled() : false) || (_rtsp ? _rtsp->isEnabled() : false) || _hls || _mp4; } void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 358f9b1b..9c148a75 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -49,8 +49,6 @@ private: MediaSource::Ptr getHlsMediaSource() const; private: - bool _enable_rtxp = false; - bool _enable_record = false; Listener *_track_listener = nullptr; RtmpMediaSourceMuxer::Ptr _rtmp; RtspMediaSourceMuxer::Ptr _rtsp; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 90e171ab..69e7ed42 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -63,7 +63,6 @@ const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio"; const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay"; -const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp"; const string kPublishToHls = GENERAL_FIELD"publishToHls"; const string kPublishToMP4 = GENERAL_FIELD"publishToMP4"; const string kMergeWriteMS = GENERAL_FIELD"mergeWriteMS"; @@ -76,7 +75,6 @@ onceToken token([](){ mINI::Instance()[kEnableVhost] = 0; mINI::Instance()[kAddMuteAudio] = 1; mINI::Instance()[kResetWhenRePlay] = 1; - mINI::Instance()[kPublishToRtxp] = 1; mINI::Instance()[kPublishToHls] = 1; mINI::Instance()[kPublishToMP4] = 0; mINI::Instance()[kMergeWriteMS] = 0; diff --git a/src/Common/config.h b/src/Common/config.h index 6832aec1..0ebdf980 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -86,8 +86,7 @@ extern const string kBroadcastOnRtspAuth; //如果errMessage为空则代表鉴权成功 //enableHls: 是否允许转换hls //enableMP4: 是否运行MP4录制 -//enableRtxp: rtmp推流时是否运行转rtsp;rtsp推流时,是否允许转rtmp -typedef std::function PublishAuthInvoker; +typedef std::function PublishAuthInvoker; //收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权 extern const string kBroadcastMediaPublish; @@ -165,8 +164,6 @@ extern const string kAddMuteAudio; //拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, //如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) extern const string kResetWhenRePlay; -//是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置 -extern const string kPublishToRtxp ; //是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置 extern const string kPublishToHls ; //是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index 4c522f9e..b2813a4b 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -26,7 +26,7 @@ public: _ring = std::make_shared(); } - virtual ~HlsMediaSource() = default; + ~HlsMediaSource() override = default; /** * 获取媒体源的环形缓冲 @@ -47,10 +47,10 @@ public: * 注册hls */ void registHls(){ - if(!_registed){ - regist(); + if (!_registed) { _registed = true; - onNoneReader(); + onReaderChanged(0); + regist(); } } @@ -62,12 +62,10 @@ private: void modifyReaderCount(bool add) { if (add) { ++_readerCount; - return; - } - - if (--_readerCount == 0) { - onNoneReader(); + } else { + --_readerCount; } + onReaderChanged(_readerCount); } private: atomic_int _readerCount; diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 284ef686..cd3f6b5c 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -60,7 +60,7 @@ public: MediaSource(RTMP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) { } - virtual ~RtmpMediaSource() {} + ~RtmpMediaSource() override{} /** * 获取媒体源的环形缓冲 @@ -134,7 +134,7 @@ public: if (!_ring) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { + auto lam = [weakSelf](int size) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -174,6 +174,11 @@ public: return ret; } + void clearCache() override{ + PacketCache::clearCache(); + _ring->clearCache(); + } + private: /** * 批量flush rtmp包时触发该函数 @@ -185,15 +190,6 @@ private: _ring->write(rtmp_list, _have_video ? key_pos : true); } - /** - * 每次增减消费者都会触发该函数 - */ - void onReaderChanged(int size) { - if (size == 0) { - onNoneReader(); - } - } - private: bool _have_video = false; int _ring_size; diff --git a/src/Rtmp/RtmpMediaSourceImp.h b/src/Rtmp/RtmpMediaSourceImp.h index f9695330..3b3de9d7 100644 --- a/src/Rtmp/RtmpMediaSourceImp.h +++ b/src/Rtmp/RtmpMediaSourceImp.h @@ -77,13 +77,12 @@ public: /** * 设置协议转换 - * @param enableRtsp 是否转换成rtsp * @param enableHls 是否转换成hls * @param enableMP4 是否mp4录制 */ - void setProtocolTranslation(bool enableRtsp, bool enableHls, bool enableMP4) { + void setProtocolTranslation(bool enableHls, bool enableMP4) { //不重复生成rtmp - _muxer = std::make_shared(getVhost(), getApp(), getId(), _demuxer->getDuration(), enableRtsp, false, enableHls, enableMP4); + _muxer = std::make_shared(getVhost(), getApp(), getId(), _demuxer->getDuration(), true, false, enableHls, enableMP4); _muxer->setMediaListener(getListener()); _muxer->setTrackListener(static_pointer_cast(shared_from_this())); //让_muxer对象拦截一部分事件(比如说录像相关事件) diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index 0a0c9083..ddc1c601 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -16,7 +16,8 @@ namespace mediakit { -class RtmpMediaSourceMuxer : public RtmpMuxer { +class RtmpMediaSourceMuxer : public RtmpMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -27,10 +28,11 @@ public: _media_src = std::make_shared(vhost, strApp, strId); getRtmpRing()->setDelegate(_media_src); } - virtual ~RtmpMediaSourceMuxer(){} + + ~RtmpMediaSourceMuxer() override{} void setListener(const std::weak_ptr &listener){ - _media_src->setListener(listener); + _listener = listener; } void setTimeStamp(uint32_t stamp){ @@ -43,10 +45,36 @@ public: void onAllTrackReady(){ makeConfigPacket(); + _media_src->setListener(shared_from_this()); _media_src->setMetaData(getMetadata()); } + void onReaderChanged(MediaSource &sender, int size) override { + _enabled = size; + if (!size) { + _clear_cache = true; + } + MediaSourceEventInterceptor::onReaderChanged(sender, size); + } + + void inputFrame(const Frame::Ptr &frame) override { + if (_clear_cache) { + _clear_cache = false; + _media_src->clearCache(); + } + if (_enabled) { + RtmpMuxer::inputFrame(frame); + } + } + + bool isEnabled() { + //缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存 + return _clear_cache ? true : _enabled; + } + private: + bool _enabled = true; + bool _clear_cache = false; RtmpMediaSource::Ptr _media_src; }; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 03b2760a..91ea29e2 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -126,7 +126,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _media_info.parse(_tc_url + "/" + getStreamId(dec.load())); _media_info._schema = RTMP_SCHEMA; - auto on_res = [this,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){ + auto on_res = [this,pToken](const string &err, bool enableHls, bool enableMP4){ auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, @@ -150,7 +150,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { _publisher_src.reset(new RtmpMediaSourceImp(_media_info._vhost, _media_info._app, _media_info._streamid)); _publisher_src->setListener(dynamic_pointer_cast(shared_from_this())); //设置转协议 - _publisher_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4); + _publisher_src->setProtocolTranslation(enableHls, enableMP4); //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 getSock()->setReadBuffer(std::make_shared(256 * 1024)); @@ -159,30 +159,29 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { if(_media_info._app.empty() || _media_info._streamid.empty()){ //不允许莫名其妙的推流url - on_res("rtmp推流url非法", false, false, false); + on_res("rtmp推流url非法", false, false); return; } - Broadcast::PublishAuthInvoker invoker = [weak_self,on_res,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){ + Broadcast::PublishAuthInvoker invoker = [weak_self, on_res, pToken](const string &err, bool enableHls, bool enableMP4) { auto strongSelf = weak_self.lock(); - if(!strongSelf){ + if (!strongSelf) { return; } - strongSelf->async([weak_self,on_res,err,pToken,enableRtxp,enableHls,enableMP4](){ + strongSelf->async([weak_self, on_res, err, pToken, enableHls, enableMP4]() { auto strongSelf = weak_self.lock(); - if(!strongSelf){ + if (!strongSelf) { return; } - on_res(err, enableRtxp, enableHls, enableMP4); + on_res(err, enableHls, enableMP4); }); }; auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认鉴权成功 - GET_CONFIG(bool,to_rtxp,General::kPublishToRtxp); GET_CONFIG(bool,to_hls,General::kPublishToHls); GET_CONFIG(bool,to_mp4,General::kPublishToMP4); - on_res("", to_rtxp, to_hls, to_mp4); + on_res("", to_hls, to_mp4); } } diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index cf58b9e8..23b4c754 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -243,7 +243,7 @@ void RtpProcess::setListener(const std::weak_ptr &listener){ void RtpProcess::emitOnPublish() { weak_ptr weak_self = shared_from_this(); - Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableRtxp, bool enableHls, bool enableMP4) { + Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) { auto strongSelf = weak_self.lock(); if (!strongSelf) { return; @@ -252,7 +252,7 @@ void RtpProcess::emitOnPublish() { strongSelf->_muxer = std::make_shared(strongSelf->_media_info._vhost, strongSelf->_media_info._app, strongSelf->_media_info._streamid, 0, - enableRtxp, enableRtxp, enableHls, enableMP4); + true, true, enableHls, enableMP4); strongSelf->_muxer->setMediaListener(strongSelf->_listener); InfoP(strongSelf) << "允许RTP推流"; } else { @@ -264,10 +264,9 @@ void RtpProcess::emitOnPublish() { auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 - GET_CONFIG(bool, toRtxp, General::kPublishToRtxp); GET_CONFIG(bool, toHls, General::kPublishToHls); GET_CONFIG(bool, toMP4, General::kPublishToMP4); - invoker("", toRtxp, toHls, toMP4); + invoker("", toHls, toMP4); } } diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 4bf4ca28..a67a4b34 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -56,7 +56,7 @@ public: int ring_size = RTP_GOP_SIZE) : MediaSource(RTSP_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} - virtual ~RtspMediaSource() {} + ~RtspMediaSource() override{} /** * 获取媒体源的环形缓冲 @@ -166,7 +166,7 @@ public: } if (!_ring) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto lam = [weakSelf](const EventPoller::Ptr &, int size, bool) { + auto lam = [weakSelf](int size) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -184,6 +184,11 @@ public: PacketCache::inputPacket(rtp->type == TrackVideo, rtp, keyPos); } + void clearCache() override{ + PacketCache::clearCache(); + _ring->clearCache(); + } + private: /** * 批量flush rtp包时触发该函数 @@ -195,15 +200,6 @@ private: _ring->write(rtp_list, _have_video ? key_pos : true); } - /** - * 每次增减消费者都会触发该函数 - */ - void onReaderChanged(int size) { - if (size == 0) { - onNoneReader(); - } - } - private: bool _have_video = false; int _ring_size; diff --git a/src/Rtsp/RtspMediaSourceImp.h b/src/Rtsp/RtspMediaSourceImp.h index 526b4687..13c4fea4 100644 --- a/src/Rtsp/RtspMediaSourceImp.h +++ b/src/Rtsp/RtspMediaSourceImp.h @@ -68,13 +68,12 @@ public: /** * 设置协议转换 - * @param enableRtmp 是否转换成rtmp * @param enableHls 是否转换成hls * @param enableMP4 是否mp4录制 */ - void setProtocolTranslation(bool enableRtmp,bool enableHls,bool enableMP4){ + void setProtocolTranslation(bool enableHls,bool enableMP4){ //不重复生成rtsp - _muxer = std::make_shared(getVhost(), getApp(), getId(), _demuxer->getDuration(), false, enableRtmp, enableHls, enableMP4); + _muxer = std::make_shared(getVhost(), getApp(), getId(), _demuxer->getDuration(), false, true, enableHls, enableMP4); _muxer->setMediaListener(getListener()); _muxer->setTrackListener(static_pointer_cast(shared_from_this())); //让_muxer对象拦截一部分事件(比如说录像相关事件) diff --git a/src/Rtsp/RtspMediaSourceMuxer.h b/src/Rtsp/RtspMediaSourceMuxer.h index f5e7b2e4..fdb815cd 100644 --- a/src/Rtsp/RtspMediaSourceMuxer.h +++ b/src/Rtsp/RtspMediaSourceMuxer.h @@ -16,7 +16,8 @@ namespace mediakit { -class RtspMediaSourceMuxer : public RtspMuxer { +class RtspMediaSourceMuxer : public RtspMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -24,29 +25,56 @@ public: const string &strApp, const string &strId, const TitleSdp::Ptr &title = nullptr) : RtspMuxer(title){ - _mediaSouce = std::make_shared(vhost,strApp,strId); - getRtpRing()->setDelegate(_mediaSouce); + _media_src = std::make_shared(vhost,strApp,strId); + getRtpRing()->setDelegate(_media_src); } - virtual ~RtspMediaSourceMuxer(){} + + ~RtspMediaSourceMuxer() override{} void setListener(const std::weak_ptr &listener){ - _mediaSouce->setListener(listener); + _listener = listener; } int readerCount() const{ - return _mediaSouce->readerCount(); + return _media_src->readerCount(); } void setTimeStamp(uint32_t stamp){ - _mediaSouce->setTimeStamp(stamp); + _media_src->setTimeStamp(stamp); } void onAllTrackReady(){ - _mediaSouce->setSdp(getSdp()); + _media_src->setListener(shared_from_this()); + _media_src->setSdp(getSdp()); + } + + void onReaderChanged(MediaSource &sender, int size) override { + _enabled = size; + if (!size) { + _clear_cache = true; + } + MediaSourceEventInterceptor::onReaderChanged(sender, size); + } + + void inputFrame(const Frame::Ptr &frame) override { + if (_clear_cache) { + _clear_cache = false; + _media_src->clearCache(); + } + if (_enabled) { + RtspMuxer::inputFrame(frame); + } + } + + bool isEnabled() { + //缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存 + return _clear_cache ? true : _enabled; } private: - RtspMediaSource::Ptr _mediaSouce; + bool _enabled = true; + bool _clear_cache = false; + RtspMediaSource::Ptr _media_src; }; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 42077ba0..6f7a6a65 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -252,7 +252,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ send_SessionNotFound(); throw SockException(Err_shutdown, _sdp_track.empty() ? "can not find any availabe track when record" : "session not found when record"); } - auto onRes = [this](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ + auto onRes = [this](const string &err, bool enableHls, bool enableMP4){ bool authSuccess = err.empty(); if(!authSuccess){ sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); @@ -261,7 +261,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ } //设置转协议 - _push_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4); + _push_src->setProtocolTranslation(enableHls, enableMP4); _StrPrinter rtp_info; for(auto &track : _sdp_track){ @@ -283,17 +283,17 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ }; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ + Broadcast::PublishAuthInvoker invoker = [weakSelf, onRes](const string &err, bool enableHls, bool enableMP4) { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + if (!strongSelf) { return; } - strongSelf->async([weakSelf,onRes,err,enableRtxp,enableHls,enableMP4](){ + strongSelf->async([weakSelf, onRes, err, enableHls, enableMP4]() { auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + if (!strongSelf) { return; } - onRes(err,enableRtxp,enableHls,enableMP4); + onRes(err, enableHls, enableMP4); }); }; @@ -301,10 +301,9 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); GET_CONFIG(bool,toHls,General::kPublishToHls); GET_CONFIG(bool,toMP4,General::kPublishToMP4); - onRes("",toRtxp,toHls,toMP4); + onRes("",toHls,toMP4); } } diff --git a/tests/test_server.cpp b/tests/test_server.cpp index aa4b8780..4e37b447 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -141,7 +141,7 @@ void initEventListener() { NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, [](BroadcastMediaPublishArgs) { DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs; - invoker("", true, true, false);//鉴权成功 + invoker("", true, false);//鉴权成功 //invoker("this is auth failed message");//鉴权失败 });