diff --git a/server/WebApi.cpp b/server/WebApi.cpp index d1cb621c..580e34c8 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -395,25 +395,21 @@ void installWebApi() { API_REGIST(api,getMediaList,{ CHECK_SECRET(); //获取所有MediaSource列表 - MediaSource::for_each_media([&](const string &schema, - const string &vhost, - const string &app, - const string &stream, - const MediaSource::Ptr &media){ - if(!allArgs["schema"].empty() && allArgs["schema"] != schema){ + MediaSource::for_each_media([&](const MediaSource::Ptr &media){ + if(!allArgs["schema"].empty() && allArgs["schema"] != media->getSchema()){ return; } - if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){ + if(!allArgs["vhost"].empty() && allArgs["vhost"] != media->getVhost()){ return; } - if(!allArgs["app"].empty() && allArgs["app"] != app){ + if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){ return; } Value item; - item["schema"] = schema; - item["vhost"] = vhost; - item["app"] = app; - item["stream"] = stream; + item["schema"] = media->getSchema(); + item["vhost"] = media->getVhost(); + item["app"] = media->getApp(); + item["stream"] = media->getId(); val["data"].append(item); }); }); @@ -453,21 +449,17 @@ void installWebApi() { int count_hit = 0; int count_closed = 0; list media_list; - MediaSource::for_each_media([&](const string &schema, - const string &vhost, - const string &app, - const string &stream, - const MediaSource::Ptr &media){ - if(!allArgs["schema"].empty() && allArgs["schema"] != schema){ + MediaSource::for_each_media([&](const MediaSource::Ptr &media){ + if(!allArgs["schema"].empty() && allArgs["schema"] != media->getSchema()){ return; } - if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){ + if(!allArgs["vhost"].empty() && allArgs["vhost"] != media->getVhost()){ return; } - if(!allArgs["app"].empty() && allArgs["app"] != app){ + if(!allArgs["app"].empty() && allArgs["app"] != media->getApp()){ return; } - if(!allArgs["stream"].empty() && allArgs["stream"] != stream){ + if(!allArgs["stream"].empty() && allArgs["stream"] != media->getId()){ return; } ++count_hit; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index cb668f37..96b63332 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -38,8 +38,142 @@ namespace mediakit { recursive_mutex MediaSource::g_mtxMediaSrc; MediaSource::SchemaVhostAppStreamMap MediaSource::g_mapMediaSrc; +MediaSource::MediaSource(const string &strSchema, const string &strVhost, const string &strApp, const string &strId) : + _strSchema(strSchema), + _strApp(strApp), + _strId(strId) { + if (strVhost.empty()) { + _strVhost = DEFAULT_VHOST; + } else { + _strVhost = strVhost; + } +} -void MediaSource::findAsync(const MediaInfo &info, +MediaSource::~MediaSource() { + unregist(); +} + +const string& MediaSource::getSchema() const { + return _strSchema; +} + +const string& MediaSource::getVhost() const { + return _strVhost; +} + +const string& MediaSource::getApp() const { + //获取该源的id + return _strApp; +} + +const string& MediaSource::getId() const { + return _strId; +} + +vector MediaSource::getTracks(bool trackReady) const { + auto strongPtr = _track_source.lock(); + if(strongPtr){ + return strongPtr->getTracks(trackReady); + } + return vector(); +} + +void MediaSource::setTrackSource(const std::weak_ptr &track_src) { + _track_source = track_src; +} + +void MediaSource::setListener(const std::weak_ptr &listener){ + _listener = listener; +} + +const std::weak_ptr& MediaSource::getListener() const{ + return _listener; +} + +bool MediaSource::seekTo(uint32_t ui32Stamp) { + auto listener = _listener.lock(); + if(!listener){ + return false; + } + return listener->seekTo(*this,ui32Stamp); +} + +bool MediaSource::close(bool force) { + auto listener = _listener.lock(); + if(!listener){ + return false; + } + return listener->close(*this,force); +} + +void MediaSource::onNoneReader(){ + auto listener = _listener.lock(); + if(!listener){ + return; + } + listener->onNoneReader(*this); +} + +void MediaSource::for_each_media(const function &cb) { + lock_guard lock(g_mtxMediaSrc); + for (auto &pr0 : g_mapMediaSrc) { + for (auto &pr1 : pr0.second) { + for (auto &pr2 : pr1.second) { + for (auto &pr3 : pr2.second) { + auto src = pr3.second.lock(); + if(src){ + cb(src); + } + } + } + } + } +} + +template +static bool searchMedia(MAP &map, + const string &schema, + const string &vhost, + const string &app, + const string &id, + FUNC &&func) { + auto it0 = map.find(schema); + if (it0 == map.end()) { + //未找到协议 + return false; + } + auto it1 = it0->second.find(vhost); + if (it1 == it0->second.end()) { + //未找到vhost + return false; + } + auto it2 = it1->second.find(app); + if (it2 == it1->second.end()) { + //未找到app + return false; + } + auto it3 = it2->second.find(id); + if (it3 == it2->second.end()) { + //未找到streamId + return false; + } + return func(it0, it1, it2, it3); +} + +template +static void eraseIfEmpty(MAP &map, IT0 it0, IT1 it1, IT2 it2) { + if (it2->second.empty()) { + it1->second.erase(it2); + if (it1->second.empty()) { + it0->second.erase(it1); + if (it0->second.empty()) { + map.erase(it0); + } + } + } +}; + +void findAsync_l(const MediaInfo &info, const std::shared_ptr &session, bool retry, const function &cb){ @@ -99,12 +233,17 @@ void MediaSource::findAsync(const MediaInfo &info, } DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid; //再找一遍媒体源,一般能找到 - findAsync(info,strongSession,false,cb); + findAsync_l(info,strongSession,false,cb); }, false); }; //监听媒体注册事件 NoticeCenter::Instance().addListener(listener_tag, Broadcast::kBroadcastMediaChanged, onRegist); } + +void MediaSource::findAsync(const MediaInfo &info, const std::shared_ptr &session,const function &cb){ + return findAsync_l(info, session, true, cb); +} + MediaSource::Ptr MediaSource::find( const string &schema, const string &vhost_tmp, @@ -124,20 +263,19 @@ MediaSource::Ptr MediaSource::find( lock_guard lock(g_mtxMediaSrc); MediaSource::Ptr ret; //查找某一媒体源,找到后返回 - searchMedia(schema, vhost, app, id, - [&](SchemaVhostAppStreamMap::iterator &it0 , - VhostAppStreamMap::iterator &it1, - AppStreamMap::iterator &it2, - StreamMap::iterator &it3){ - ret = it3->second.lock(); - if(!ret){ - //该对象已经销毁 - it2->second.erase(it3); - eraseIfEmpty(it0,it1,it2); - return false; - } - return true; - }); + searchMedia(g_mapMediaSrc, schema, vhost, app, id, [&](SchemaVhostAppStreamMap::iterator &it0, + VhostAppStreamMap::iterator &it1, + AppStreamMap::iterator &it2, + StreamMap::iterator &it3) { + ret = it3->second.lock(); + if (!ret) { + //该对象已经销毁 + it2->second.erase(it3); + eraseIfEmpty(g_mapMediaSrc,it0, it1, it2); + return false; + } + return true; + }); if(!ret && bMake){ //未查找媒体源,则创建一个 ret = MediaReader::onMakeMediaSource(schema, vhost,app,id); @@ -166,17 +304,17 @@ void MediaSource::regist() { bool MediaSource::unregist() { //反注册该源 lock_guard lock(g_mtxMediaSrc); - return searchMedia(_strSchema, _strVhost, _strApp, _strId, [&](SchemaVhostAppStreamMap::iterator &it0 , - VhostAppStreamMap::iterator &it1, - AppStreamMap::iterator &it2, - StreamMap::iterator &it3){ + return searchMedia(g_mapMediaSrc, _strSchema, _strVhost, _strApp, _strId,[&](SchemaVhostAppStreamMap::iterator &it0, + VhostAppStreamMap::iterator &it1, + AppStreamMap::iterator &it2, + StreamMap::iterator &it3) { auto strongMedia = it3->second.lock(); - if(strongMedia && this != strongMedia.get()){ + if (strongMedia && this != strongMedia.get()) { //不是自己,不允许反注册 return false; } it2->second.erase(it3); - eraseIfEmpty(it0,it1,it2); + eraseIfEmpty(g_mapMediaSrc, it0, it1, it2); unregisted(); return true; }); @@ -192,6 +330,9 @@ void MediaSource::unregisted(){ *this); } + +/////////////////////////////////////MediaInfo////////////////////////////////////// + void MediaInfo::parse(const string &url){ //string url = "rtsp://127.0.0.1:8554/live/id?key=val&a=1&&b=2&vhost=vhost.com"; auto schema_pos = url.find("://"); @@ -241,6 +382,8 @@ void MediaInfo::parse(const string &url){ } } +/////////////////////////////////////MediaSourceEvent////////////////////////////////////// + void MediaSourceEvent::onNoneReader(MediaSource &sender){ //没有任何读取器消费该源,表明该源可以关闭了 WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 1e31b57b..69b77ccc 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -45,7 +45,7 @@ using namespace toolkit; namespace toolkit{ class TcpSession; -}//namespace toolkit +}// namespace toolkit namespace mediakit { @@ -54,17 +54,18 @@ class MediaSourceEvent{ public: MediaSourceEvent(){}; virtual ~MediaSourceEvent(){}; -public: + + // 通知拖动进度条 virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ - //拖动进度条 return false; } + // 通知其停止推流 virtual bool close(MediaSource &sender,bool force) { - //通知其停止推流 return false; } + // 通知无人观看 virtual void onNoneReader(MediaSource &sender); }; @@ -92,6 +93,9 @@ public: string _param_strs; }; +/** + * 媒体源,任何rtsp/rtmp的直播流都源自该对象 + */ class MediaSource: public TrackSource, public enable_shared_from_this { public: typedef std::shared_ptr Ptr; @@ -100,152 +104,59 @@ public: typedef unordered_map VhostAppStreamMap; typedef unordered_map SchemaVhostAppStreamMap; - MediaSource(const string &strSchema, - const string &strVhost, - const string &strApp, - const string &strId) : - _strSchema(strSchema), - _strApp(strApp), - _strId(strId) { - if(strVhost.empty()){ - _strVhost = DEFAULT_VHOST; - }else{ - _strVhost = strVhost; - } - } - virtual ~MediaSource() { - unregist(); - } + MediaSource(const string &strSchema, const string &strVhost, const string &strApp, const string &strId) ; + virtual ~MediaSource() ; - static Ptr find(const string &schema, - const string &vhost, - const string &app, - const string &id, - bool bMake = true) ; - - static void findAsync(const MediaInfo &info, - const std::shared_ptr &session, - bool retry, - const function &cb); - - const string& getSchema() const { - return _strSchema; - } - const string& getVhost() const { - return _strVhost; - } - const string& getApp() const { - //获取该源的id - return _strApp; - } - const string& getId() const { - return _strId; - } - - bool seekTo(uint32_t ui32Stamp) { - auto listener = _listener.lock(); - if(!listener){ - return false; - } - return listener->seekTo(*this,ui32Stamp); - } + // 获取协议类型 + const string& getSchema() const; + // 虚拟主机 + const string& getVhost() const; + // 应用名 + const string& getApp() const; + // 流id + const string& getId() const; + // 获取所有Track + vector getTracks(bool trackReady = true) const override; + // 获取监听者 + const std::weak_ptr& getListener() const; + // 设置TrackSource + void setTrackSource(const std::weak_ptr &track_src); + // 设置监听者 + virtual void setListener(const std::weak_ptr &listener); + // 获取观看者个数 + virtual int readerCount() = 0; + // 获取流当前时间戳 virtual uint32_t getTimeStamp(TrackType trackType) = 0; - bool close(bool force) { - auto listener = _listener.lock(); - if(!listener){ - return false; - } - return listener->close(*this,force); - } + // 拖动进度条 + bool seekTo(uint32_t ui32Stamp); + // 关闭该流 + bool close(bool force); + // 该流无人观看 + void onNoneReader(); - void onNoneReader(){ - auto listener = _listener.lock(); - if(!listener){ - return; - } - listener->onNoneReader(*this); - } + // 同步查找流 + static Ptr find(const string &schema, const string &vhost, const string &app, const string &id, bool bMake = true) ; + // 异步查找流 + static void findAsync(const MediaInfo &info, const std::shared_ptr &session, const function &cb); + // 遍历所有流 + static void for_each_media(const function &cb); - virtual void setListener(const std::weak_ptr &listener){ - _listener = listener; - } - - std::weak_ptr getListener(){ - return _listener; - } - - template - static void for_each_media(FUN && fun){ - lock_guard lock(g_mtxMediaSrc); - for (auto &pr0 : g_mapMediaSrc){ - for(auto &pr1 : pr0.second){ - for(auto &pr2 : pr1.second){ - for(auto &pr3 : pr2.second){ - fun(pr0.first,pr1.first,pr2.first,pr3.first,pr3.second.lock()); - } - } - } - } - } - - virtual int readerCount() = 0; protected: void regist() ; bool unregist() ; -private: - template - static bool searchMedia(const string &schema, - const string &vhost, - const string &app, - const string &id, - FUN &&fun){ - auto it0 = g_mapMediaSrc.find(schema); - if (it0 == g_mapMediaSrc.end()) { - //未找到协议 - return false; - } - auto it1 = it0->second.find(vhost); - if(it1 == it0->second.end()){ - //未找到vhost - return false; - } - auto it2 = it1->second.find(app); - if(it2 == it1->second.end()){ - //未找到app - return false; - } - auto it3 = it2->second.find(id); - if(it3 == it2->second.end()){ - //未找到streamId - return false; - } - return fun(it0,it1,it2,it3); - } - template - static void eraseIfEmpty(IT0 it0,IT1 it1,IT2 it2){ - if(it2->second.empty()){ - it1->second.erase(it2); - if(it1->second.empty()){ - it0->second.erase(it1); - if(it0->second.empty()){ - g_mapMediaSrc.erase(it0); - } - } - } - }; - void unregisted(); -protected: - std::weak_ptr _listener; + private: - string _strSchema;//协议类型 - string _strVhost; //vhost - string _strApp; //媒体app - string _strId; //媒体id - static SchemaVhostAppStreamMap g_mapMediaSrc; //静态的媒体源表 - static recursive_mutex g_mtxMediaSrc; //访问静态的媒体源表的互斥锁 + string _strSchema; + string _strVhost; + string _strApp; + string _strId; + std::weak_ptr _listener; + weak_ptr _track_source; + static SchemaVhostAppStreamMap g_mapMediaSrc; + static recursive_mutex g_mtxMediaSrc; }; } /* namespace mediakit */ diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 4bf40c85..e1723309 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -31,7 +31,7 @@ #include "Rtmp/RtmpMediaSourceMuxer.h" #include "MediaFile/MediaRecorder.h" -class MultiMediaSourceMuxer : public MediaSink{ +class MultiMediaSourceMuxer : public MediaSink , public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; @@ -128,9 +128,11 @@ protected: */ void onAllTrackReady() override{ if(_rtmp) { + _rtmp->setTrackSource(shared_from_this()); _rtmp->onAllTrackReady(); } if(_rtsp) { + _rtmp->setTrackSource(shared_from_this()); _rtsp->onAllTrackReady(); } } diff --git a/src/Extension/Track.h b/src/Extension/Track.h index b4771027..229a13ea 100644 --- a/src/Extension/Track.h +++ b/src/Extension/Track.h @@ -141,9 +141,7 @@ public: * @param trackReady 是否获取全部已经准备好的Track * @return */ - virtual vector getTracks(bool trackReady = true) const { - return vector(); - } + virtual vector getTracks(bool trackReady = true) const = 0; /** * 获取特定Track diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 1c36e5cc..88fa99e0 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -197,7 +197,7 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,bClose,this,cb](const MediaSource::Ptr &src){ + MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,bClose,this,cb](const MediaSource::Ptr &src){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ //本对象已经销毁 diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index 73c04d54..53365d1a 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -42,13 +42,13 @@ MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) { MediaPlayer::~MediaPlayer() { } void MediaPlayer::play(const string &strUrl) { - _parser = PlayerBase::createPlayer(_poller,strUrl); - _parser->setOnShutdown(_shutdownCB); - _parser->setOnPlayResult(_playResultCB); - _parser->setOnResume(_resumeCB); - _parser->setMediaSouce(_pMediaSrc); - _parser->mINI::operator=(*this); - _parser->play(strUrl); + _delegate = PlayerBase::createPlayer(_poller,strUrl); + _delegate->setOnShutdown(_shutdownCB); + _delegate->setOnPlayResult(_playResultCB); + _delegate->setOnResume(_resumeCB); + _delegate->setMediaSouce(_pMediaSrc); + _delegate->mINI::operator=(*this); + _delegate->play(strUrl); } EventPoller::Ptr MediaPlayer::getPoller(){ @@ -56,14 +56,14 @@ EventPoller::Ptr MediaPlayer::getPoller(){ } void MediaPlayer::pause(bool bPause) { - if (_parser) { - _parser->pause(bPause); + if (_delegate) { + _delegate->pause(bPause); } } void MediaPlayer::teardown() { - if (_parser) { - _parser->teardown(); + if (_delegate) { + _delegate->teardown(); } } diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index 54aa0f3c..7e8f55ff 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -127,6 +127,13 @@ public: * @return */ virtual float getPacketLossRate(TrackType trackType) const {return 0; } + + /** + * 获取所有track + */ + vector getTracks(bool trackReady = true) const override{ + return vector(); + } protected: virtual void onShutdown(const SockException &ex) {} virtual void onPlayResult(const SockException &ex) {} @@ -136,9 +143,8 @@ protected: virtual void onResume(){}; }; -template -class PlayerImp : public Parent -{ +template +class PlayerImp : public Parent { public: typedef std::shared_ptr Ptr; @@ -147,62 +153,62 @@ public: virtual ~PlayerImp(){} void setOnShutdown(const function &cb) override { - if (_parser) { - _parser->setOnShutdown(cb); + if (_delegate) { + _delegate->setOnShutdown(cb); } _shutdownCB = cb; } void setOnPlayResult(const function &cb) override { - if (_parser) { - _parser->setOnPlayResult(cb); + if (_delegate) { + _delegate->setOnPlayResult(cb); } _playResultCB = cb; } void setOnResume(const function &cb) override { - if (_parser) { - _parser->setOnResume(cb); + if (_delegate) { + _delegate->setOnResume(cb); } _resumeCB = cb; } bool isInited(int analysisMs) override{ - if (_parser) { - return _parser->isInited(analysisMs); + if (_delegate) { + return _delegate->isInited(analysisMs); } - return PlayerBase::isInited(analysisMs); + return Parent::isInited(analysisMs); } float getDuration() const override { - if (_parser) { - return _parser->getDuration(); + if (_delegate) { + return _delegate->getDuration(); } - return PlayerBase::getDuration(); + return Parent::getDuration(); } float getProgress() const override{ - if (_parser) { - return _parser->getProgress(); + if (_delegate) { + return _delegate->getProgress(); } - return PlayerBase::getProgress(); + return Parent::getProgress(); } void seekTo(float fProgress) override{ - if (_parser) { - return _parser->seekTo(fProgress); + if (_delegate) { + return _delegate->seekTo(fProgress); } - return PlayerBase::seekTo(fProgress); + return Parent::seekTo(fProgress); } void setMediaSouce(const MediaSource::Ptr & src) override { - if (_parser) { - _parser->setMediaSouce(src); + if (_delegate) { + _delegate->setMediaSouce(src); } _pMediaSrc = src; } vector getTracks(bool trackReady = true) const override{ - if (_parser) { - return _parser->getTracks(trackReady); + if (_delegate) { + return _delegate->getTracks(trackReady); } - return PlayerBase::getTracks(trackReady); + return Parent::getTracks(trackReady); } protected: void onShutdown(const SockException &ex) override { @@ -228,7 +234,7 @@ protected: function _shutdownCB; function _playResultCB; function _resumeCB; - std::shared_ptr _parser; + std::shared_ptr _delegate; MediaSource::Ptr _pMediaSrc; }; diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 075394c9..766db316 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -138,13 +138,13 @@ void PlayerProxy::play(const string &strUrlTmp) { MediaPlayer::play(strUrlTmp); MediaSource::Ptr mediaSource; - if(dynamic_pointer_cast(_parser)){ + if(dynamic_pointer_cast(_delegate)){ //rtsp拉流 GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy); if(directProxy && _bEnableRtsp){ mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); } - } else if(dynamic_pointer_cast(_parser)){ + } else if(dynamic_pointer_cast(_delegate)){ //rtmp拉流 if(_bEnableRtmp){ mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index 2f8e81cc..55112dea 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -56,6 +56,11 @@ public: void onAllTrackReady(){ _mediaSouce->onGetMetaData(getMetadata()); } + + // 设置TrackSource + void setTrackSource(const std::weak_ptr &track_src){ + _mediaSouce->setTrackSource(track_src); + } private: RtmpMediaSource::Ptr _mediaSouce; }; diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index 49e8da6a..b170a19f 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -67,18 +67,18 @@ private: if(_pRtmpMediaSrc){ _pRtmpMediaSrc->onGetMetaData(val); } - _parser.reset(new RtmpDemuxer(val)); + _delegate.reset(new RtmpDemuxer(val)); return true; } void onMediaData(const RtmpPacket::Ptr &chunkData) override { if(_pRtmpMediaSrc){ _pRtmpMediaSrc->onWrite(chunkData); } - if(!_parser){ + if(!_delegate){ //这个流没有metadata - _parser.reset(new RtmpDemuxer()); + _delegate.reset(new RtmpDemuxer()); } - _parser->inputRtmp(chunkData); + _delegate->inputRtmp(chunkData); } private: RtmpMediaSource::Ptr _pRtmpMediaSrc; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 671ff101..a9f62916 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -319,7 +319,7 @@ void RtmpSession::doPlayResponse(const string &err,const std::function weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf,cb](const MediaSource::Ptr &src){ + MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,cb](const MediaSource::Ptr &src){ auto rtmp_src = dynamic_pointer_cast(src); auto strongSelf = weakSelf.lock(); if(strongSelf){ diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index 4d7c864b..31df804f 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -52,7 +52,8 @@ public: RtmpToRtspMediaSource(const string &vhost, const string &app, const string &id, - int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){ + int ringSize = 0) : + RtmpMediaSource(vhost, app, id,ringSize){ } virtual ~RtmpToRtspMediaSource(){} @@ -83,7 +84,7 @@ public: _muxer->addTrack(track); track->addDelegate(_muxer); } - _muxer->setListener(_listener); + _muxer->setListener(getListener()); } RtmpMediaSource::onWrite(pkt,key_pos); } diff --git a/src/Rtsp/RtspMediaSourceMuxer.h b/src/Rtsp/RtspMediaSourceMuxer.h index 9c496bd6..050e83c2 100644 --- a/src/Rtsp/RtspMediaSourceMuxer.h +++ b/src/Rtsp/RtspMediaSourceMuxer.h @@ -60,6 +60,11 @@ public: void onAllTrackReady(){ _mediaSouce->onGetSDP(getSdp()); } + + // 设置TrackSource + void setTrackSource(const std::weak_ptr &track_src){ + _mediaSouce->setTrackSource(track_src); + } private: RtspMediaSource::Ptr _mediaSouce; }; diff --git a/src/Rtsp/RtspPlayerImp.h b/src/Rtsp/RtspPlayerImp.h index 0bf77c34..8992bded 100644 --- a/src/Rtsp/RtspPlayerImp.h +++ b/src/Rtsp/RtspPlayerImp.h @@ -66,16 +66,16 @@ private: if(_pRtspMediaSrc){ _pRtspMediaSrc->onGetSDP(sdp); } - _parser.reset(new RtspDemuxer(sdp)); + _delegate.reset(new RtspDemuxer(sdp)); return true; } void onRecvRTP(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &track) override { if(_pRtspMediaSrc){ _pRtspMediaSrc->onWrite(rtp,true); } - _parser->inputRtp(rtp); + _delegate->inputRtp(rtp); - if(_maxAnalysisMS && _parser->isInited(_maxAnalysisMS)){ + if(_maxAnalysisMS && _delegate->isInited(_maxAnalysisMS)){ PlayerImp::onPlayResult(SockException(Err_success,"play rtsp success")); _maxAnalysisMS = 0; } @@ -87,7 +87,7 @@ private: //如果超过这个时间还未获取成功,那么会强制触发onPlayResult事件(虽然此时有些track还未初始化成功) void onPlayResult(const SockException &ex) override { //isInited判断条件:无超时 - if(ex || _parser->isInited(0)){ + if(ex || _delegate->isInited(0)){ //已经初始化成功,说明sdp里面有完善的信息 PlayerImp::onPlayResult(ex); }else{ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index cb138308..1d679e23 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -371,7 +371,7 @@ void RtspSession::handleReq_Describe(const Parser &parser) { void RtspSession::onAuthSuccess() { TraceP(this); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_mediaInfo,weakSelf.lock(), true,[weakSelf](const MediaSource::Ptr &src){ + MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf](const MediaSource::Ptr &src){ auto strongSelf = weakSelf.lock(); if(!strongSelf){ return; diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index a05cb393..c4cdaf18 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -43,7 +43,8 @@ public: RtspToRtmpMediaSource(const string &vhost, const string &app, const string &id, - int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) { + int ringSize = 0) + : RtspMediaSource(vhost, app, id,ringSize) { } virtual ~RtspToRtmpMediaSource() {} @@ -69,7 +70,7 @@ public: _muxer->addTrack(track); track->addDelegate(_muxer); } - _muxer->setListener(_listener); + _muxer->setListener(getListener()); } } RtspMediaSource::onWrite(rtp, bKeyPos); diff --git a/src/Shell/ShellCMD.h b/src/Shell/ShellCMD.h index eef6b653..e3a44901 100644 --- a/src/Shell/ShellCMD.h +++ b/src/Shell/ShellCMD.h @@ -16,39 +16,35 @@ class CMD_media: public CMD { public: CMD_media(){ _parser.reset(new OptionParser([](const std::shared_ptr &stream,mINI &ini){ - MediaSource::for_each_media([&](const string &schema, - const string &vhost, - const string &app, - const string &streamid, - const MediaSource::Ptr &media){ - if(!ini["schema"].empty() && ini["schema"] != schema){ + MediaSource::for_each_media([&](const MediaSource::Ptr &media){ + if(!ini["schema"].empty() && ini["schema"] != media->getSchema()){ //筛选协议不匹配 return; } - if(!ini["vhost"].empty() && ini["vhost"] != vhost){ + if(!ini["vhost"].empty() && ini["vhost"] != media->getVhost()){ //筛选虚拟主机不匹配 return; } - if(!ini["app"].empty() && ini["app"] != app){ + if(!ini["app"].empty() && ini["app"] != media->getApp()){ //筛选应用名不匹配 return; } - if(!ini["stream"].empty() && ini["stream"] != streamid){ + if(!ini["stream"].empty() && ini["stream"] != media->getId()){ //流id不匹配 return; } if(ini.find("list") != ini.end()){ //列出源 (*stream) << "\t" - << schema << "/" - << vhost << "/" - << app << "/" - << streamid + << media->getSchema() << "/" + << media->getVhost() << "/" + << media->getApp() << "/" + << media->getId() << "\r\n"; return; } - EventPollerPool::Instance().getPoller()->async([ini,media,stream,schema,vhost,app,streamid](){ + EventPollerPool::Instance().getPoller()->async([ini,media,stream](){ if(ini.find("kick") != ini.end()){ //踢出源 do{ @@ -59,18 +55,18 @@ public: break; } (*stream) << "\t踢出成功:" - << schema << "/" - << vhost << "/" - << app << "/" - << streamid + << media->getSchema() << "/" + << media->getVhost() << "/" + << media->getApp() << "/" + << media->getId() << "\r\n"; return; }while(0); (*stream) << "\t踢出失败:" - << schema << "/" - << vhost << "/" - << app << "/" - << streamid + << media->getSchema() << "/" + << media->getVhost() << "/" + << media->getApp() << "/" + << media->getId() << "\r\n"; } },false);