diff --git a/Android/app/src/main/cpp/test_server.cpp b/Android/app/src/main/cpp/test_server.cpp index 5c17b097..2867919b 100644 --- a/Android/app/src/main/cpp/test_server.cpp +++ b/Android/app/src/main/cpp/test_server.cpp @@ -102,112 +102,158 @@ static onceToken token1([](){ #define REALM "realm_zlmedaikit" static map s_mapFlvRecorder; static mutex s_mtxFlvRecorder; -static onceToken s_token([](){ - //监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnGetRtspRealm,[](BroadcastOnGetRtspRealmArgs){ - DebugL << "RTSP是否需要鉴权事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - if(string("1") == args._streamid ){ - // live/1需要认证 - //该流需要认证,并且设置realm - invoker(REALM); - }else{ - //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 - //该流我们不需要认证 - invoker(""); - } - }); - //监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnRtspAuth,[](BroadcastOnRtspAuthArgs){ - DebugL << "RTSP播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - DebugL << "RTSP用户:" << user_name << (must_no_encrypt ? " Base64" : " MD5" )<< " 方式登录"; - string user = user_name; - //假设我们异步读取数据库 - if(user == "test0"){ - //假设数据库保存的是明文 - invoker(false,"pwd0"); - return; - } +static void initEvent() { + static onceToken s_token([]() { + //监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm, + [](BroadcastOnGetRtspRealmArgs) { + DebugL << "RTSP是否需要鉴权事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + if (string("1") == args._streamid) { + // live/1需要认证 + //该流需要认证,并且设置realm + invoker(REALM); + } else { + //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 + //该流我们不需要认证 + invoker(""); + } + }); - if(user == "test1"){ - //假设数据库保存的是密文 - auto encrypted_pwd = MD5(user + ":" + REALM + ":" + "pwd1").hexdigest(); - invoker(true,encrypted_pwd); - return; - } - if(user == "test2" && must_no_encrypt){ - //假设登录的是test2,并且以base64方式登录,此时我们提供加密密码,那么会导致认证失败 - //可以通过这个方式屏蔽base64这种不安全的加密方式 - invoker(true,"pwd2"); - return; - } + //监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnRtspAuth, + [](BroadcastOnRtspAuthArgs) { + DebugL << "RTSP播放鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + DebugL << "RTSP用户:" << user_name + << (must_no_encrypt ? " Base64" : " MD5") + << " 方式登录"; + string user = user_name; + //假设我们异步读取数据库 + if (user == "test0") { + //假设数据库保存的是明文 + invoker(false, "pwd0"); + return; + } - //其他用户密码跟用户名一致 - invoker(false,user); - }); + if (user == "test1") { + //假设数据库保存的是密文 + auto encrypted_pwd = MD5( + user + ":" + REALM + ":" + + "pwd1").hexdigest(); + invoker(true, encrypted_pwd); + return; + } + if (user == "test2" && must_no_encrypt) { + //假设登录的是test2,并且以base64方式登录,此时我们提供加密密码,那么会导致认证失败 + //可以通过这个方式屏蔽base64这种不安全的加密方式 + invoker(true, "pwd2"); + return; + } + + //其他用户密码跟用户名一致 + invoker(false, user); + }); - //监听rtsp/rtmp推流事件,返回结果告知是否有推流权限 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ - DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //监听rtsp/rtmp推流事件,返回结果告知是否有推流权限 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, + [](BroadcastMediaPublishArgs) { + DebugL << "推流鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //监听rtsp/rtsps/rtmp/http-flv播放事件,返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权) - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPlayed,[](BroadcastMediaPlayedArgs){ - DebugL << "播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //监听rtsp/rtsps/rtmp/http-flv播放事件,返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权) + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPlayed, + [](BroadcastMediaPlayedArgs) { + DebugL << "播放鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //shell登录事件,通过shell可以登录进服务器执行一些命令 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){ - DebugL << "shell login:" << user_name << " " << passwd; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //shell登录事件,通过shell可以登录进服务器执行一些命令 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastShellLogin, + [](BroadcastShellLoginArgs) { + DebugL << "shell login:" << user_name << " " + << passwd; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //监听rtsp、rtmp源注册或注销事件;此处用于测试rtmp保存为flv录像,保存在http根目录下 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){ - if(schema == RTMP_SCHEMA && app == "live"){ - lock_guard lck(s_mtxFlvRecorder); - if(bRegist){ - DebugL << "开始录制RTMP:" << schema << " " << vhost << " " << app << " " << stream; - GET_CONFIG(string,http_root,Http::kRootPath); - auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv"; - FlvRecorder::Ptr recorder(new FlvRecorder); - try{ - recorder->startRecord(EventPollerPool::Instance().getPoller(),dynamic_pointer_cast(sender.shared_from_this()),path); - s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder; - }catch(std::exception &ex){ - WarnL << ex.what(); - } - }else{ - s_mapFlvRecorder.erase(vhost + "/" + app + "/" + stream); - } - } - }); + //监听rtsp、rtmp源注册或注销事件;此处用于测试rtmp保存为flv录像,保存在http根目录下 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, + [](BroadcastMediaChangedArgs) { + if (schema == RTMP_SCHEMA && app == "live") { + lock_guard lck(s_mtxFlvRecorder); + if (bRegist) { + DebugL << "开始录制RTMP:" << schema << " " + << vhost << " " << app << " " + << stream; + GET_CONFIG(string, http_root, + Http::kRootPath); + auto path = http_root + "/" + vhost + "/" + + app + "/" + stream + "_" + + to_string(time(NULL)) + ".flv"; + FlvRecorder::Ptr recorder(new FlvRecorder); + try { + recorder->startRecord( + EventPollerPool::Instance().getPoller(), + dynamic_pointer_cast( + sender.shared_from_this()), + path); + s_mapFlvRecorder[vhost + "/" + app + + "/" + + stream] = recorder; + } catch (std::exception &ex) { + WarnL << ex.what(); + } + } else { + s_mapFlvRecorder.erase( + vhost + "/" + app + "/" + stream); + } + } + }); - //监听播放失败(未找到特定的流)事件 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ - /** - * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 - * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) - */ - DebugL << "未找到流事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - }); + //监听播放失败(未找到特定的流)事件 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, + [](BroadcastNotFoundStreamArgs) { + /** + * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 + * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) + */ + DebugL << "未找到流事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + }); - //监听播放或推流结束时消耗流量事件 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ - DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs - << "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "秒" ; + //监听播放或推流结束时消耗流量事件 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastFlowReport, + [](BroadcastFlowReportArgs) { + DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " << args._param_strs + << "\r\n使用流量:" << totalBytes + << " bytes,连接时长:" << totalDuration << "秒"; - }); + }); -}, nullptr); + }, nullptr); +} #if !defined(SIGHUP) #define SIGHUP 1 @@ -296,6 +342,7 @@ static int do_main(string ini_file) { TcpServer::Ptr rtspSSLSrv(new TcpServer()); rtspSSLSrv->start(rtspsPort);//默认322 + initEvent(); //服务器支持动态切换端口(不影响现有连接) NoticeCenter::Instance().addListener(ReloadConfigTag,Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){ //重新创建服务器 diff --git a/docker/Dockerfile b/docker/Dockerfile index 7b79cd9d..069b9301 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -36,7 +36,6 @@ WORKDIR /usr/src/ZLMediaKit ADD 3rdpart ./3rdpart ADD Android ./Android ADD cmake ./cmake -ADD node_modules ./node_modules ADD server ./server ADD src ./src ADD tests ./tests diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index b45104eb..570872f4 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -6,8 +6,8 @@ else() file(GLOB MediaServer_src_list ./*.cpp ./*.h) endif() -message(STATUS ${MediaServer_src_list}) -#add_compile_options(-l sqlite3) +#message(STATUS ${MediaServer_src_list}) + add_executable(MediaServer ${MediaServer_src_list}) if(WIN32) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 39fe1792..5c7569d0 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -504,7 +504,7 @@ void processProxyCfg(const Json::Value &proxyData, const bool initialize = false return; } - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, vApp, vStream, vEnableHls, vRecordMp4)); + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, vApp, vStream, true, true, vEnableHls, vRecordMp4)); m_s_proxyMap[proxyKey] = player; //指定RTP over TCP(播放rtsp时有效) @@ -1291,6 +1291,8 @@ void installWebApi() { const string &app, const string &stream, const string &url, + bool enable_rtsp, + bool enable_rtmp, bool enable_hls, bool enable_mp4, int rtp_type, @@ -1303,7 +1305,7 @@ void installWebApi() { return; } //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_hls,enable_mp4)); + PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_rtsp,enable_rtmp,enable_hls,enable_mp4)); s_proxyMap[key] = player; //指定RTP over TCP(播放rtsp时有效) @@ -1326,16 +1328,18 @@ void installWebApi() { }; //动态添加rtsp/rtmp拉流代理 - //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&stream=0&url=rtmp://127.0.0.1/live/obs + //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs API_REGIST_INVOKER(api,addStreamProxy,{ CHECK_SECRET(); - CHECK_ARGS("vhost","app","stream","url"); + CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp"); addStreamProxy(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["url"], - allArgs["enable_hls"], - allArgs["enable_mp4"], + allArgs["enable_rtsp"],/* 是否rtsp转发 */ + allArgs["enable_rtmp"],/* 是否rtmp转发 */ + allArgs["enable_hls"],/* 是否hls转发 */ + allArgs["enable_mp4"],/* 是否MP4录制 */ allArgs["rtp_type"], [invoker,val,headerOut](const SockException &ex,const string &key){ if(ex){ @@ -1461,7 +1465,7 @@ void installWebApi() { #if !defined(_WIN32) - API_REGIST_INVOKER(hook,on_stream_not_found,{ + API_REGIST_INVOKER(hook,on_stream_not_found_ffmpeg,{ //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流 /* CHECK_SECRET(); CHECK_ARGS("vhost","app","stream"); @@ -1500,9 +1504,11 @@ void installWebApi() { allArgs["app"], allArgs["stream"], /** 支持rtsp和rtmp方式拉流 ,rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/ - "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov",//rtmp://live.hkstv.hk.lxdns.com/live/hks2 - false, - false, + "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov", + true,/* 开启rtsp转发 */ + true,/* 开启rtmp转发 */ + true,/* 开启hls转发 */ + false,/* 禁用MP4录制 */ 0,//rtp over tcp方式拉流 [invoker,val,headerOut](const SockException &ex,const string &key){ if(ex){ diff --git a/src/Common/Device.cpp b/src/Common/Device.cpp index 160b1fbe..682c2c4c 100644 --- a/src/Common/Device.cpp +++ b/src/Common/Device.cpp @@ -41,10 +41,12 @@ DevChannel::DevChannel(const string &strVhost, const string &strApp, const string &strId, float fDuration, + bool bEanbleRtsp, + bool bEanbleRtmp, bool bEanbleHls, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4) : - MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleHls, bRecordMp4) {} + MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleRtsp, bEanbleRtmp, bEanbleHls, bRecordMp4) {} DevChannel::~DevChannel() {} @@ -87,13 +89,14 @@ void DevChannel::inputPCM(char* pcData, int iDataLen, uint32_t uiStamp) { } #endif //ENABLE_FAAC -void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t uiStamp) { - if(uiStamp == 0){ - uiStamp = (uint32_t)_aTicker[0].elapsedTime(); +void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32_t pts) { + if(dts == 0){ + dts = (uint32_t)_aTicker[0].elapsedTime(); } - + if(pts == 0){ + pts = dts; + } int prefixeSize; - if (memcmp("\x00\x00\x00\x01", pcData, 4) == 0) { prefixeSize = 4; } else if (memcmp("\x00\x00\x01", pcData, 3) == 0) { @@ -101,8 +104,7 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t uiStamp) { } else { prefixeSize = 0; } - - inputFrame(std::make_shared((char *)pcData,iDataLen,uiStamp,prefixeSize)); + inputFrame(std::make_shared((char *)pcData,iDataLen,dts,pts,prefixeSize)); } void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) { @@ -118,12 +120,12 @@ void DevChannel::inputAAC(const char *pcDataWithoutAdts,int iDataLen, uint32_t u uiStamp = (uint32_t)_aTicker[1].elapsedTime(); } if(pcAdtsHeader + 7 == pcDataWithoutAdts){ - inputFrame(std::make_shared((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7)); + inputFrame(std::make_shared((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7)); } else { char *dataWithAdts = new char[iDataLen + 7]; memcpy(dataWithAdts,pcAdtsHeader,7); memcpy(dataWithAdts + 7 , pcDataWithoutAdts , iDataLen); - inputFrame(std::make_shared(dataWithAdts,iDataLen + 7,uiStamp,7)); + inputFrame(std::make_shared(dataWithAdts,iDataLen + 7,uiStamp,7)); delete [] dataWithAdts; } } diff --git a/src/Common/Device.h b/src/Common/Device.h index a237bb53..014179b2 100644 --- a/src/Common/Device.h +++ b/src/Common/Device.h @@ -74,6 +74,8 @@ public: const string &strApp, const string &strId, float fDuration = 0, + bool bEanbleRtsp = true, + bool bEanbleRtmp = true, bool bEanbleHls = true, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4 = 0); @@ -98,9 +100,10 @@ public: * 输入264帧 * @param pcData 264单帧数据指针 * @param iDataLen 数据指针长度 - * @param uiStamp 时间戳,单位毫秒;等于0时内部会自动生成时间戳 + * @param dts 解码时间戳,单位毫秒;等于0时内部会自动生成时间戳 + * @param pts 播放时间戳,单位毫秒;等于0时内部会赋值为dts */ - void inputH264(const char *pcData, int iDataLen, uint32_t uiStamp); + void inputH264(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0); /** * 输入可能带adts头的aac帧 diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 0be8901a..7f1535a6 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -241,9 +241,17 @@ void MediaInfo::parse(const string &url){ } void MediaSourceEvent::onNoneReader(MediaSource &sender){ - WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); //没有任何读取器消费该源,表明该源可以关闭了 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender); + WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); + weak_ptr weakPtr = sender.shared_from_this(); + + //异步广播该事件,防止同步调用sender.close()导致在接收rtp或rtmp包时清空包缓存等操作 + EventPollerPool::Instance().getPoller()->async([weakPtr](){ + auto strongPtr = weakPtr.lock(); + if(strongPtr){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,*strongPtr); + } + },false); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index b3b51be9..a973cba2 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -39,13 +39,19 @@ public: const string &strApp, const string &strId, float dur_sec = 0.0, + bool bEanbleRtsp = true, + bool bEanbleRtmp = true, bool bEanbleHls = true, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 - int bRecordMp4 = false){ - _rtmp = std::make_shared(vhost,strApp,strId,std::make_shared(dur_sec)); - _rtsp = std::make_shared(vhost,strApp,strId,std::make_shared(dur_sec)); + int bRecordMp4 = false + ){ + if (bEanbleRtmp) { + _rtmp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + } + if (bEanbleRtsp) { + _rtsp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + } _record = std::make_shared(vhost,strApp,strId,bEanbleHls,bRecordMp4); - } virtual ~MultiMediaSourceMuxer(){} @@ -55,8 +61,12 @@ public: * @param track 媒体描述 */ void addTrack(const Track::Ptr & track) { - _rtmp->addTrack(track); - _rtsp->addTrack(track); + if(_rtmp){ + _rtmp->addTrack(track); + } + if(_rtsp){ + _rtsp->addTrack(track); + } _record->addTrack(track); } @@ -65,8 +75,12 @@ public: * @param frame 帧数据 */ void inputFrame(const Frame::Ptr &frame) override { - _rtmp->inputFrame(frame); - _rtsp->inputFrame(frame); + if(_rtmp) { + _rtmp->inputFrame(frame); + } + if(_rtsp) { + _rtsp->inputFrame(frame); + } _record->inputFrame(frame); } @@ -75,8 +89,12 @@ public: * @param listener */ void setListener(const std::weak_ptr &listener){ - _rtmp->setListener(listener); - _rtsp->setListener(listener); + if(_rtmp) { + _rtmp->setListener(listener); + } + if(_rtsp) { + _rtsp->setListener(listener); + } } /** @@ -84,11 +102,13 @@ public: * @return */ int readerCount() const{ - return _rtsp->readerCount() + _rtmp->readerCount(); + return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0); } void setTimeStamp(uint32_t stamp){ - _rtsp->setTimeStamp(stamp); + if(_rtsp){ + _rtsp->setTimeStamp(stamp); + } } private: RtmpMediaSourceMuxer::Ptr _rtmp; diff --git a/src/Common/Parser.h b/src/Common/Parser.h index fdbdc8f2..bfe2ceed 100644 --- a/src/Common/Parser.h +++ b/src/Common/Parser.h @@ -16,7 +16,9 @@ namespace mediakit{ string FindField(const char *buf, const char *start, const char *end, int bufSize = 0); struct StrCaseCompare { - bool operator()(const string &__x, const string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; } + bool operator()(const string &__x, const string &__y) const { + return strcasecmp(__x.data(), __y.data()) < 0; + } }; @@ -25,17 +27,19 @@ class StrCaseMap : public multimap{ typedef multimap Super ; StrCaseMap() = default; ~StrCaseMap() = default; - string &operator[](const string &key){ - auto it = find(key); + + template + string &operator[](K &&k){ + auto it = find(std::forward(k)); if(it == end()){ - it = Super::emplace(key,""); + it = Super::emplace(std::forward(k),""); } return it->second; } template void emplace(K &&k , V &&v) { - auto it = find(k); + auto it = find(std::forward(k)); if(it != end()){ return; } diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 0e19df23..42aedb2d 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -161,11 +161,13 @@ namespace Rtsp { const string kAuthBasic = RTSP_FIELD"authBasic"; const string kHandshakeSecond = RTSP_FIELD"handshakeSecond"; const string kKeepAliveSecond = RTSP_FIELD"keepAliveSecond"; +const string kDirectProxy = RTSP_FIELD"directProxy";; onceToken token([](){ //默认Md5方式认证 mINI::Instance()[kAuthBasic] = 0; mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15; + mINI::Instance()[kDirectProxy] = 1; },nullptr); } //namespace Rtsp diff --git a/src/Common/config.h b/src/Common/config.h index d396b5c7..8807d175 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -202,6 +202,13 @@ extern const string kAuthBasic; extern const string kHandshakeSecond; //维持链接超时时间,默认15秒 extern const string kKeepAliveSecond; + +//rtsp拉流代理是否直接代理 +//直接代理后支持任意编码格式,但是会导致GOP缓存无法定位到I帧,可能会导致开播花屏 +//并且如果是tcp方式拉流,如果rtp大于mtu会导致无法使用udp方式代理 +//假定您的拉流源地址不是264或265或AAC,那么你可以使用直接代理的方式来支持rtsp代理 +//默认开启rtsp直接代理,rtmp由于没有这些问题,是强制开启直接代理的 +extern const string kDirectProxy; } //namespace Rtsp ////////////RTMP服务器配置/////////// diff --git a/src/Extension/AAC.h b/src/Extension/AAC.h index 24832f8a..e445b7fe 100644 --- a/src/Extension/AAC.h +++ b/src/Extension/AAC.h @@ -105,15 +105,15 @@ public: uint32_t iPrefixSize = 7; } ; -class AACFrameNoCopyAble : public FrameNoCopyAble { +class AACFrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - AACFrameNoCopyAble(char *ptr,uint32_t size,uint32_t stamp,int prefixeSize = 7){ - buffer_ptr = ptr; - buffer_size = size; - timeStamp = stamp; - iPrefixSize = prefixeSize; + AACFrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){ + _ptr = ptr; + _size = size; + _dts = dts; + _prefixSize = prefixeSize; } TrackType getTrackType() const override{ diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index 1658f0df..ced72b6e 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -118,8 +118,20 @@ Track::Ptr Factory::getTrackByCodecId(CodecId codecId) { RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) { GET_CONFIG(uint32_t,audio_mtu,Rtp::kAudioMtuSize); GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); - // ssrc不冲突即可 - uint32_t ssrc = ((uint64_t) sdp.get()) & 0xFFFFFFFF; + // ssrc不冲突即可,可以为任意的32位整形 + static atomic s_ssrc(0); + uint32_t ssrc = s_ssrc++; + if(!ssrc){ + //ssrc不能为0 + ssrc = 1; + } + if(sdp->getTrackType() == TrackVideo){ + //视频的ssrc是偶数,方便调试 + ssrc = 2 * ssrc; + }else{ + //音频ssrc是奇数 + ssrc = 2 * ssrc + 1; + } auto mtu = (sdp->getTrackType() == TrackVideo ? video_mtu : audio_mtu); auto sample_rate = sdp->getSampleRate(); auto pt = sdp->getPlayloadType(); diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp new file mode 100644 index 00000000..0b6f5232 --- /dev/null +++ b/src/Extension/Frame.cpp @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "Frame.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit{ + +Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){ + if(frame->cacheAble()){ + return frame; + } + return std::make_shared(frame); +} + +}//namespace mediakit + diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index 7995be3c..5d49753a 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -77,7 +77,7 @@ public: /** * 帧类型的抽象接口 */ -class Frame : public Buffer, public CodecInfo{ +class Frame : public Buffer, public CodecInfo { public: typedef std::shared_ptr Ptr; virtual ~Frame(){} @@ -116,6 +116,17 @@ public: * @return */ virtual bool keyFrame() const = 0; + + /** + * 是否可以缓存 + */ + virtual bool cacheAble() const { return true; } + + /** + * 返回可缓存的frame + * @return + */ + static Ptr getCacheAbleFrame(const Ptr &frame); }; /** @@ -281,26 +292,114 @@ private: map _delegateMap; }; -class FrameNoCopyAble : public Frame{ +/** + * 通过Frame接口包装指针,方便使用者把自己的数据快速接入ZLMediaKit + */ +class FrameFromPtr : public Frame{ public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; char *data() const override{ - return buffer_ptr; + return _ptr; } uint32_t size() const override { - return buffer_size; + return _size; } + uint32_t dts() const override { - return timeStamp; + return _dts; } + + uint32_t pts() const override{ + if(_pts){ + return _pts; + } + return dts(); + } + uint32_t prefixSize() const override{ - return iPrefixSize; + return _prefixSize; } +protected: + char *_ptr; + uint32_t _size; + uint32_t _dts; + uint32_t _pts = 0; + uint32_t _prefixSize; +}; + +/** + * 不可缓存的帧,在DevChannel类中有用到。 + * 该帧类型用于防止内存拷贝,直接使用指针传递数据 + * 在大多数情况下,ZLMediaKit是同步对帧数据进行使用和处理的 + * 所以提供此类型的帧很有必要,但是有时又无法避免缓存帧做后续处理 + * 所以可以通过Frame::getCacheAbleFrame方法拷贝一个可缓存的帧 + */ +class FrameNoCacheAble : public FrameFromPtr{ public: - char *buffer_ptr; - uint32_t buffer_size; - uint32_t timeStamp; - uint32_t iPrefixSize; + typedef std::shared_ptr Ptr; + + /** + * 该帧不可缓存 + * @return + */ + bool cacheAble() const override { + return false; + } +}; + +/** + * 该对象的功能是把一个不可缓存的帧转换成可缓存的帧 + * @see FrameNoCacheAble + */ +class FrameCacheAble : public FrameFromPtr { +public: + typedef std::shared_ptr Ptr; + + FrameCacheAble(const Frame::Ptr &frame){ + if(frame->cacheAble()){ + _frame = frame; + _ptr = frame->data(); + }else{ + _buffer = std::make_shared(); + _buffer->assign(frame->data(),frame->size()); + _ptr = _buffer->data(); + } + _size = frame->size(); + _dts = frame->dts(); + _pts = frame->pts(); + _prefixSize = frame->prefixSize(); + _trackType = frame->getTrackType(); + _codec = frame->getCodecId(); + _key = frame->keyFrame(); + } + + virtual ~FrameCacheAble() = default; + + /** + * 可以被缓存 + * @return + */ + bool cacheAble() const override { + return true; + } + + TrackType getTrackType() const override{ + return _trackType; + } + + CodecId getCodecId() const override{ + return _codec; + } + + bool keyFrame() const override{ + return _key; + } +private: + Frame::Ptr _frame; + BufferRaw::Ptr _buffer; + TrackType _trackType; + CodecId _codec; + bool _key; }; diff --git a/src/Extension/H264.h b/src/Extension/H264.h index eebfec63..853c7f60 100644 --- a/src/Extension/H264.h +++ b/src/Extension/H264.h @@ -92,15 +92,21 @@ public: }; -class H264FrameNoCopyAble : public FrameNoCopyAble { +/** + * 防止内存拷贝的H264类 + * 用户可以通过该类型快速把一个指针无拷贝的包装成Frame类 + * 该类型在DevChannel中有使用 + */ +class H264FrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - H264FrameNoCopyAble(char *ptr,uint32_t size,uint32_t stamp,int prefixeSize = 4){ - buffer_ptr = ptr; - buffer_size = size; - timeStamp = stamp; - iPrefixSize = prefixeSize; + H264FrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts , uint32_t pts ,int prefixeSize = 4){ + _ptr = ptr; + _size = size; + _dts = dts; + _pts = pts; + _prefixSize = prefixeSize; } TrackType getTrackType() const override{ @@ -112,23 +118,30 @@ public: } bool keyFrame() const override { - return H264_TYPE(buffer_ptr[iPrefixSize]) == H264Frame::NAL_IDR; + return H264_TYPE(_ptr[_prefixSize]) == H264Frame::NAL_IDR; } }; -class H264FrameSubFrame : public H264FrameNoCopyAble{ +/** + * 一个H264Frame类中可以有多个帧,他们通过 0x 00 00 01 分隔 + * ZLMediaKit会先把这种复合帧split成单个帧然后再处理 + * 一个复合帧可以通过无内存拷贝的方式切割成多个H264FrameSubFrame + * 提供该类的目的是切换复合帧时防止内存拷贝,提高性能 + */ +class H264FrameSubFrame : public H264FrameNoCacheAble{ public: typedef std::shared_ptr Ptr; - - H264FrameSubFrame(const Frame::Ptr &strongRef, - char *ptr, - uint32_t size, - uint32_t stamp, - int prefixeSize) : H264FrameNoCopyAble(ptr,size,stamp,prefixeSize){ - _strongRef = strongRef; + H264FrameSubFrame(const Frame::Ptr &parent_frame, + char *ptr, + uint32_t size, + int prefixeSize) : H264FrameNoCacheAble(ptr,size,parent_frame->dts(),parent_frame->pts(),prefixeSize){ + _parent_frame = parent_frame; + } + bool cacheAble() const override { + return _parent_frame->cacheAble(); } private: - Frame::Ptr _strongRef; + Frame::Ptr _parent_frame; }; /** @@ -234,7 +247,6 @@ public: H264FrameSubFrame::Ptr sub_frame = std::make_shared(frame, frame->data(), len + frame->prefixSize(), - frame->stamp(), frame->prefixSize()); inputFrame_l(sub_frame); first_frame = false; @@ -242,7 +254,6 @@ public: H264FrameSubFrame::Ptr sub_frame = std::make_shared(frame, (char *)ptr, len , - frame->stamp(), 3); inputFrame_l(sub_frame); } @@ -311,27 +322,23 @@ private: } if(!_sps.empty()){ - if(!_spsFrame){ - _spsFrame = std::make_shared(); - _spsFrame->type = H264Frame::NAL_SPS; - _spsFrame->iPrefixSize = 4; - } - _spsFrame->buffer.assign("\x0\x0\x0\x1",4); - _spsFrame->buffer.append(_sps); - _spsFrame->timeStamp = frame->stamp(); - VideoTrack::inputFrame(_spsFrame); + auto spsFrame = std::make_shared(); + spsFrame->type = H264Frame::NAL_SPS; + spsFrame->iPrefixSize = 4; + spsFrame->buffer.assign("\x0\x0\x0\x1",4); + spsFrame->buffer.append(_sps); + spsFrame->timeStamp = frame->stamp(); + VideoTrack::inputFrame(spsFrame); } if(!_pps.empty()){ - if(!_ppsFrame) { - _ppsFrame = std::make_shared(); - _ppsFrame->type = H264Frame::NAL_PPS; - _ppsFrame->iPrefixSize = 4; - } - _ppsFrame->buffer.assign("\x0\x0\x0\x1",4); - _ppsFrame->buffer.append(_pps); - _ppsFrame->timeStamp = frame->stamp(); - VideoTrack::inputFrame(_ppsFrame); + auto ppsFrame = std::make_shared(); + ppsFrame->type = H264Frame::NAL_PPS; + ppsFrame->iPrefixSize = 4; + ppsFrame->buffer.assign("\x0\x0\x0\x1",4); + ppsFrame->buffer.append(_pps); + ppsFrame->timeStamp = frame->stamp(); + VideoTrack::inputFrame(ppsFrame); } } private: @@ -341,8 +348,6 @@ private: int _height = 0; float _fps = 0; bool _last_frame_is_idr = false; - H264Frame::Ptr _spsFrame; - H264Frame::Ptr _ppsFrame; }; diff --git a/src/Extension/H264Rtmp.cpp b/src/Extension/H264Rtmp.cpp index 3adb0b7e..8f735fe3 100644 --- a/src/Extension/H264Rtmp.cpp +++ b/src/Extension/H264Rtmp.cpp @@ -108,6 +108,7 @@ inline void H264RtmpDecoder::onGetH264_l(const char* pcData, int iLen, uint32_t } } inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dts,uint32_t pts) { +#if 1 _h264frame->type = H264_TYPE(pcData[0]); _h264frame->timeStamp = dts; _h264frame->ptsStamp = pts; @@ -117,6 +118,11 @@ inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dt //写入环形缓存 RtmpCodec::inputFrame(_h264frame); _h264frame = obtainFrame(); +#else + //防止内存拷贝,这样产生的264帧不会有0x00 00 01头 + auto frame = std::make_shared((char *)pcData,iLen,dts,pts,0); + RtmpCodec::inputFrame(frame); +#endif } diff --git a/src/Extension/H265.h b/src/Extension/H265.h index 32892ed9..9ba82458 100644 --- a/src/Extension/H265.h +++ b/src/Extension/H265.h @@ -121,15 +121,16 @@ public: }; -class H265FrameNoCopyAble : public FrameNoCopyAble { +class H265FrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - H265FrameNoCopyAble(char *ptr, uint32_t size, uint32_t stamp, int prefixeSize = 4) { - buffer_ptr = ptr; - buffer_size = size; - timeStamp = stamp; - iPrefixSize = prefixeSize; + H265FrameNoCacheAble(char *ptr, uint32_t size, uint32_t dts,uint32_t pts, int prefixeSize = 4) { + _ptr = ptr; + _size = size; + _dts = dts; + _pts = pts; + _prefixSize = prefixeSize; } TrackType getTrackType() const override { @@ -141,7 +142,7 @@ public: } bool keyFrame() const override { - int type = H265_TYPE(((uint8_t *) buffer_ptr)[iPrefixSize]); + int type = H265_TYPE(((uint8_t *) _ptr)[_prefixSize]); return H265Frame::isKeyFrame(type); } }; @@ -263,47 +264,38 @@ private: return; } if(!_vps.empty()){ - if (!_vpsFrame) { - _vpsFrame = std::make_shared(); - _vpsFrame->type = H265Frame::NAL_VPS; - _vpsFrame->iPrefixSize = 4; - } - _vpsFrame->buffer.assign("\x0\x0\x0\x1", 4); - _vpsFrame->buffer.append(_vps); - _vpsFrame->timeStamp = frame->stamp(); - VideoTrack::inputFrame(_vpsFrame); + auto vpsFrame = std::make_shared(); + vpsFrame->type = H265Frame::NAL_VPS; + vpsFrame->iPrefixSize = 4; + vpsFrame->buffer.assign("\x0\x0\x0\x1", 4); + vpsFrame->buffer.append(_vps); + vpsFrame->timeStamp = frame->stamp(); + VideoTrack::inputFrame(vpsFrame); } if (!_sps.empty()) { - if (!_spsFrame) { - _spsFrame = std::make_shared(); - _spsFrame->type = H265Frame::NAL_SPS; - _spsFrame->iPrefixSize = 4; - } - _spsFrame->buffer.assign("\x0\x0\x0\x1", 4); - _spsFrame->buffer.append(_sps); - _spsFrame->timeStamp = frame->stamp(); - VideoTrack::inputFrame(_spsFrame); + auto spsFrame = std::make_shared(); + spsFrame->type = H265Frame::NAL_SPS; + spsFrame->iPrefixSize = 4; + spsFrame->buffer.assign("\x0\x0\x0\x1", 4); + spsFrame->buffer.append(_sps); + spsFrame->timeStamp = frame->stamp(); + VideoTrack::inputFrame(spsFrame); } if (!_pps.empty()) { - if (!_ppsFrame) { - _ppsFrame = std::make_shared(); - _ppsFrame->type = H265Frame::NAL_PPS; - _ppsFrame->iPrefixSize = 4; - } - _ppsFrame->buffer.assign("\x0\x0\x0\x1", 4); - _ppsFrame->buffer.append(_pps); - _ppsFrame->timeStamp = frame->stamp(); - VideoTrack::inputFrame(_ppsFrame); + auto ppsFrame = std::make_shared(); + ppsFrame->type = H265Frame::NAL_PPS; + ppsFrame->iPrefixSize = 4; + ppsFrame->buffer.assign("\x0\x0\x0\x1", 4); + ppsFrame->buffer.append(_pps); + ppsFrame->timeStamp = frame->stamp(); + VideoTrack::inputFrame(ppsFrame); } } private: string _vps; string _sps; string _pps; - H265Frame::Ptr _vpsFrame; - H265Frame::Ptr _spsFrame; - H265Frame::Ptr _ppsFrame; bool _last_frame_is_idr = false; }; diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 72b1c2db..3f98c60c 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -39,6 +39,7 @@ HttpClient::~HttpClient() { void HttpClient::sendRequest(const string &strUrl, float fTimeOutSec) { _aliveTicker.resetTime(); + _url = strUrl; auto protocol = FindField(strUrl.data(), NULL, "://"); uint16_t defaultPort; bool isHttps; @@ -148,6 +149,20 @@ void HttpClient::onErr(const SockException &ex) { int64_t HttpClient::onRecvHeader(const char *data, uint64_t len) { _parser.Parse(data); + if(_parser.Url() == "302" || _parser.Url() == "301"){ + auto newUrl = _parser["Location"]; + if(newUrl.empty()){ + shutdown(SockException(Err_shutdown,"未找到Location字段(跳转url)")); + return 0; + } + if(onRedirectUrl(newUrl,_parser.Url() == "302")){ + HttpClient::clear(); + setMethod("GET"); + HttpClient::sendRequest(newUrl,_fTimeOutSec); + return 0; + } + } + checkCookie(_parser.getValues()); _totalBodySize = onResponseHeader(_parser.Url(), _parser.getValues()); diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 492100e0..1f8287e5 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -221,7 +221,6 @@ public: _recvedBodySize = 0; _totalBodySize = 0; _aliveTicker.resetTime(); - _fTimeOutSec = 0; _chunkedSplitter.reset(); HttpRequestSplitter::reset(); } @@ -255,6 +254,10 @@ public: const Parser& response() const{ return _parser; } + + const string &getUrl() const{ + return _url; + } protected: /** * 收到http回复头 @@ -293,6 +296,14 @@ protected: */ virtual void onDisconnect(const SockException &ex){} + /** + * 重定向事件 + * @param url 重定向url + * @param temporary 是否为临时重定向 + * @return 是否继续 + */ + virtual bool onRedirectUrl(const string &url,bool temporary){ return true;}; + //HttpRequestSplitter override int64_t onRecvHeader(const char *data,uint64_t len) override ; void onRecvContent(const char *data,uint64_t len) override; @@ -308,6 +319,7 @@ private: protected: bool _isHttps; private: + string _url; HttpHeader _header; HttpBody::Ptr _body; string _method; diff --git a/src/Http/HttpRequestSplitter.cpp b/src/Http/HttpRequestSplitter.cpp index 4a6fd7e4..fe9779d0 100644 --- a/src/Http/HttpRequestSplitter.cpp +++ b/src/Http/HttpRequestSplitter.cpp @@ -59,24 +59,23 @@ void HttpRequestSplitter::input(const char *data,uint64_t len) { //_content_len == 0,这是请求头 const char *header_ptr = ptr; int64_t header_size = index - ptr; - ptr = index; _remain_data_size = len - (ptr - data); - _content_len = onRecvHeader(header_ptr, header_size); } - /* - * 恢复末尾字节 - */ - tail_ref = tail_tmp; - if(_remain_data_size <= 0){ //没有剩余数据,清空缓存 _remain_data.clear(); return; } + /* + * 恢复末尾字节 + * 移动到这来,目的是防止HttpRequestSplitter::reset()导致内存失效 + */ + tail_ref = tail_tmp; + if(_content_len == 0){ //尚未找到http头,缓存定位到剩余数据部分 string str(ptr,_remain_data_size); diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 73f4bf45..3fd111c0 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -933,7 +933,11 @@ void HttpSession::responseDelay(const string &Origin,bool bClose, headerOther["Access-Control-Allow-Origin"] = Origin; headerOther["Access-Control-Allow-Credentials"] = "true"; } - const_cast(headerOut).insert(headerOther.begin(), headerOther.end()); + + for (auto &pr : headerOther){ + //添加默认http头,默认http头不能覆盖用户自定义的头 + const_cast(headerOut).emplace(pr.first,pr.second); + } sendResponse(codeOut.data(), headerOut, contentOut); } inline void HttpSession::sendNotFound(bool bClose) { diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 8cdd3fae..8bf100b8 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -37,6 +37,7 @@ namespace mediakit { #ifdef ENABLE_MP4V2 MediaReader::MediaReader(const string &strVhost,const string &strApp, const string &strId,const string &filePath ) { + _poller = EventPollerPool::Instance().getPoller(); auto strFileName = filePath; if(strFileName.empty()){ GET_CONFIG(string,recordPath,Record::kFilePath); @@ -137,7 +138,7 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri } _iDuration = MAX(_video_ms,_audio_ms); - _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost,strApp,strId,_iDuration/1000.0,false, false)); + _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost, strApp, strId, _iDuration / 1000.0, true, true, false, false)); if (_audio_trId != MP4_INVALID_TRACK_ID) { AACTrack::Ptr track = std::make_shared(_strAacCfg); _mediaMuxer->addTrack(track); @@ -164,7 +165,7 @@ void MediaReader::startReadMP4() { _timer = std::make_shared(sampleMS / 1000.0f,[strongSelf](){ return strongSelf->readSample(0,false); - }, nullptr); + }, _poller); //先读sampleMS毫秒的数据用于产生MediaSouce readSample(sampleMS, false); @@ -221,7 +222,7 @@ inline bool MediaReader::readVideoSample(int iTimeInc,bool justSeekSyncFrame) { break; } memcpy(pBytes + iOffset, "\x0\x0\x0\x1", 4); - writeH264(pBytes + iOffset, iFrameLen + 4, (double) _video_ms * iIdx / _video_num_samples); + writeH264(pBytes + iOffset, iFrameLen + 4, (double) _video_ms * iIdx / _video_num_samples, 0); iOffset += (iFrameLen + 4); } }else if(_bSyncSample){ @@ -259,12 +260,12 @@ inline bool MediaReader::readAudioSample(int iTimeInc,bool justSeekSyncFrame) { return false; } -inline void MediaReader::writeH264(uint8_t *pucData,int iLen,uint32_t uiStamp) { - _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,uiStamp)); +inline void MediaReader::writeH264(uint8_t *pucData,int iLen,uint32_t dts,uint32_t pts) { + _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,dts,pts)); } inline void MediaReader::writeAAC(uint8_t *pucData,int iLen,uint32_t uiStamp) { - _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,uiStamp)); + _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,uiStamp)); } inline MP4SampleId MediaReader::getVideoSampleId(int iTimeInc ) { diff --git a/src/MediaFile/MediaReader.h b/src/MediaFile/MediaReader.h index 2092d9fe..a2917367 100644 --- a/src/MediaFile/MediaReader.h +++ b/src/MediaFile/MediaReader.h @@ -97,7 +97,7 @@ private: bool readSample(int iTimeInc, bool justSeekSyncFrame); inline bool readVideoSample(int iTimeInc,bool justSeekSyncFrame); inline bool readAudioSample(int iTimeInc,bool justSeekSyncFrame); - inline void writeH264(uint8_t *pucData,int iLen,uint32_t uiStamp); + inline void writeH264(uint8_t *pucData,int iLen,uint32_t dts,uint32_t pts); inline void writeAAC(uint8_t *pucData,int iLen,uint32_t uiStamp); private: MP4FileHandle _hMP4File = MP4_INVALID_FILE_HANDLE; @@ -132,6 +132,7 @@ private: Ticker _alive; recursive_mutex _mtx; Timer::Ptr _timer; + EventPoller::Ptr _poller; #endif //ENABLE_MP4V2 }; diff --git a/src/MediaFile/Mp4Maker.cpp b/src/MediaFile/Mp4Maker.cpp index 59b345c4..293be0b1 100644 --- a/src/MediaFile/Mp4Maker.cpp +++ b/src/MediaFile/Mp4Maker.cpp @@ -25,7 +25,7 @@ */ #ifdef ENABLE_MP4V2 - +#include #include #include "Common/config.h" #include "Mp4Maker.h" diff --git a/src/MediaFile/TsMuxer.cpp b/src/MediaFile/TsMuxer.cpp index 5a16438b..2eabd145 100644 --- a/src/MediaFile/TsMuxer.cpp +++ b/src/MediaFile/TsMuxer.cpp @@ -70,7 +70,12 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { if(_frameCached.size() != 1){ string merged; _frameCached.for_each([&](const Frame::Ptr &frame){ - merged.append(frame->data(),frame->size()); + if(frame->prefixSize()){ + merged.append(frame->data(),frame->size()); + } else{ + merged.append("\x00\x00\x00\x01",4); + merged.append(frame->data(),frame->size()); + } }); merged_frame = std::make_shared(std::move(merged)); } @@ -78,7 +83,7 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { mpeg_ts_write(_context, it->second, back->keyFrame() ? 0x0001 : 0, back->pts() * 90LL, back->dts() * 90LL, merged_frame->data(), merged_frame->size()); _frameCached.clear(); } - _frameCached.emplace_back(frame); + _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); } break; default: { diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index 9c517595..4de82c30 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -40,12 +40,23 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const st ptr->teardown(); }; string prefix = FindField(strUrl.data(), NULL, "://"); + + if (strcasecmp("rtsps",prefix.data()) == 0) { + return PlayerBase::Ptr(new TcpClientWithSSL(poller),releasePlayer); + } + if (strcasecmp("rtsp",prefix.data()) == 0) { return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer); } + + if (strcasecmp("rtmps",prefix.data()) == 0) { + return PlayerBase::Ptr(new TcpClientWithSSL(poller),releasePlayer); + } + if (strcasecmp("rtmp",prefix.data()) == 0) { return PlayerBase::Ptr(new RtmpPlayerImp(poller),releasePlayer); } + return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer); } diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index 8338a0b2..cb115ee7 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -216,7 +216,7 @@ public: void setMediaSouce(const MediaSource::Ptr & src) override { if (_parser) { - return _parser->setMediaSouce(src); + _parser->setMediaSouce(src); } _pMediaSrc = src; } diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index d41168a0..f4bbe1be 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -65,6 +65,8 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, PlayerProxy::PlayerProxy(const string &strVhost, const string &strApp, const string &strSrc, + bool bEnableRtsp, + bool bEnableRtmp, bool bEnableHls, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4, @@ -73,6 +75,8 @@ PlayerProxy::PlayerProxy(const string &strVhost, _strVhost = strVhost; _strApp = strApp; _strSrc = strSrc; + _bEnableRtsp = bEnableRtsp; + _bEnableRtmp = bEnableRtmp; _bEnableHls = bEnableHls; //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 _bRecordMp4 = bRecordMp4; @@ -128,13 +132,30 @@ void PlayerProxy::play(const string &strUrlTmp) { } }); MediaPlayer::play(strUrlTmp); + + MediaSource::Ptr mediaSource; + if(dynamic_pointer_cast(_parser)){ + //rtsp拉流 + GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy); + if(directProxy && _bEnableRtsp){ + mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); + } + }else if(dynamic_pointer_cast(_parser)){ + //rtmp拉流 + if(_bEnableRtmp){ + mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); + } + } + if(mediaSource){ + setMediaSouce(mediaSource); + mediaSource->setListener(shared_from_this()); + } } PlayerProxy::~PlayerProxy() { _timer.reset(); } void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ - auto iTaskId = reinterpret_cast(this); auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000,60*1000)); weak_ptr weakSelf = shared_from_this(); _timer = std::make_shared(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { @@ -148,8 +169,13 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ return false; }, getPoller()); } + +int PlayerProxy::readerCount(){ + return (_mediaMuxer ? _mediaMuxer->readerCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); +} + bool PlayerProxy::close(MediaSource &sender,bool force) { - if(!_mediaMuxer || (!force && _mediaMuxer->readerCount() != 0)){ + if(!force && readerCount() != 0){ return false; } @@ -159,6 +185,7 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { auto stronSelf = weakSlef.lock(); if (stronSelf) { stronSelf->_mediaMuxer.reset(); + stronSelf->setMediaSouce(nullptr); stronSelf->teardown(); if(stronSelf->_onClose){ stronSelf->_onClose(); @@ -187,7 +214,7 @@ public: auto iAudioIndex = frame->stamp() / MUTE_ADTS_DATA_MS; if(_iAudioIndex != iAudioIndex){ _iAudioIndex = iAudioIndex; - auto aacFrame = std::make_shared((char *)MUTE_ADTS_DATA, + auto aacFrame = std::make_shared((char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _iAudioIndex * MUTE_ADTS_DATA_MS); FrameRingInterfaceDelegate::inputFrame(aacFrame); @@ -200,7 +227,16 @@ private: void PlayerProxy::onPlaySuccess() { //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost,_strApp,_strSrc,getDuration(),_bEnableHls,_bRecordMp4)); + if (dynamic_pointer_cast(_pMediaSrc)) { + //rtsp拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bRecordMp4)); + } else if (dynamic_pointer_cast(_pMediaSrc)) { + //rtmp拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bRecordMp4)); + } else { + //其他拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bRecordMp4)); + } _mediaMuxer->setListener(shared_from_this()); auto videoTrack = getTrack(TrackVideo,false); diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index e7a50f9c..57a3278e 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -49,6 +49,8 @@ public: PlayerProxy(const string &strVhost, const string &strApp, const string &strSrc, + bool bEnableRtsp = true, + bool bEnableRtmp = true, bool bEnableHls = true, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4 = 0, @@ -85,7 +87,10 @@ private: void onNoneReader(MediaSource &sender) override; void rePlay(const string &strUrl,int iFailedCnt); void onPlaySuccess(); + int readerCount() ; private: + bool _bEnableRtsp; + bool _bEnableRtmp; bool _bEnableHls; //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int _bRecordMp4; diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp index 47d7226e..a8f47d92 100644 --- a/src/Pusher/PusherBase.cpp +++ b/src/Pusher/PusherBase.cpp @@ -44,12 +44,23 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller, ptr->teardown(); }; string prefix = FindField(strUrl.data(), NULL, "://"); + + if (strcasecmp("rtsps",prefix.data()) == 0) { + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + } + if (strcasecmp("rtsp",prefix.data()) == 0) { return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); } + + if (strcasecmp("rtmps",prefix.data()) == 0) { + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + } + if (strcasecmp("rtmp",prefix.data()) == 0) { return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast(src)),releasePusher); } + return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); } diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index 6b6b3ec1..f84dec69 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -38,8 +38,8 @@ #include "Rtmp.h" #include "RtmpMediaSource.h" #include "RtmpDemuxer.h" -#include "MediaFile/MediaRecorder.h" -#include "Rtsp/RtspMediaSourceMuxer.h" +#include "Common/MultiMediaSourceMuxer.h" + using namespace std; using namespace toolkit; @@ -55,49 +55,54 @@ public: bool bEnableHls = true, //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4 = 0, - int ringSize = 0):RtmpMediaSource(vhost, app, id,ringSize){ - _recorder = std::make_shared(vhost, app, id, bEnableHls, bRecordMp4); - _rtmpDemuxer = std::make_shared(); + int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){ + _bEnableHls = bEnableHls; + _bRecordMp4 = bRecordMp4; + _demuxer = std::make_shared(); } virtual ~RtmpToRtspMediaSource(){} void onGetMetaData(const AMFValue &metadata) override { - _rtmpDemuxer = std::make_shared(metadata); + _demuxer = std::make_shared(metadata); RtmpMediaSource::onGetMetaData(metadata); } void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos) override { - _rtmpDemuxer->inputRtmp(pkt); - if(!_rtspMuxer && _rtmpDemuxer->isInited(2000)){ - _rtspMuxer = std::make_shared(getVhost(), - getApp(), - getId(), - std::make_shared(_rtmpDemuxer->getDuration())); - for (auto &track : _rtmpDemuxer->getTracks(false)){ - _rtspMuxer->addTrack(track); - _recorder->addTrack(track); - track->addDelegate(_rtspMuxer); - track->addDelegate(_recorder); + _demuxer->inputRtmp(pkt); + if(!_muxer && _demuxer->isInited(2000)){ + _muxer = std::make_shared(getVhost(), + getApp(), + getId(), + _demuxer->getDuration(), + true,//转rtsp + false,//不重复生成rtmp + _bEnableHls, + _bRecordMp4); + for (auto &track : _demuxer->getTracks(false)){ + _muxer->addTrack(track); + track->addDelegate(_muxer); } - _rtspMuxer->setListener(_listener); + _muxer->setListener(_listener); } RtmpMediaSource::onWrite(pkt,key_pos); } void setListener(const std::weak_ptr &listener) override { RtmpMediaSource::setListener(listener); - if(_rtspMuxer){ - _rtspMuxer->setListener(listener); + if(_muxer){ + _muxer->setListener(listener); } } int readerCount() override { - return RtmpMediaSource::readerCount() + (_rtspMuxer ? _rtspMuxer->readerCount() : 0); + return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); } private: - RtmpDemuxer::Ptr _rtmpDemuxer; - RtspMediaSourceMuxer::Ptr _rtspMuxer; - MediaRecorder::Ptr _recorder; + RtmpDemuxer::Ptr _demuxer; + MultiMediaSourceMuxer::Ptr _muxer; + bool _bEnableHls; + //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 + bool _bRecordMp4; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 65ed6dbe..df20c36b 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -198,7 +198,7 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { sendDescribe(); return; } - if(parser.Url() == "302"){ + if(parser.Url() == "302" || parser.Url() == "301"){ auto newUrl = parser["Location"]; if(newUrl.empty()){ throw std::runtime_error("未找到Location字段(跳转url)"); @@ -232,6 +232,31 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { sendSetup(0); } + +//有必要的情况下创建udp端口 +void RtspPlayer::createUdpSockIfNecessary(int track_idx){ + auto &rtpSockRef = _apRtpSock[track_idx]; + auto &rtcpSockRef = _apRtcpSock[track_idx]; + if(!rtpSockRef){ + rtpSockRef.reset(new Socket(getPoller())); + //rtp随机端口 + if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) { + rtpSockRef.reset(); + throw std::runtime_error("open rtp sock failed"); + } + } + + if(!rtcpSockRef){ + rtcpSockRef.reset(new Socket(getPoller())); + //rtcp端口为rtp端口+1,目的是为了兼容某些服务器,其实更推荐随机端口 + if (!rtcpSockRef->bindUdpSock(rtpSockRef->get_local_port() + 1, get_local_ip().data())) { + rtcpSockRef.reset(); + throw std::runtime_error("open rtcp sock failed"); + } + } +} + + //发送SETUP命令 void RtspPlayer::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex); @@ -247,16 +272,7 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - _apRtpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { - _apRtpSock[trackIndex].reset(); - throw std::runtime_error("open rtp sock err"); - } - _apRtcpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apRtcpSock[trackIndex]->bindUdpSock(_apRtpSock[trackIndex]->get_local_port() + 1, get_local_ip().data())) { - _apRtcpSock[trackIndex].reset(); - throw std::runtime_error("open rtcp sock err"); - } + createUdpSockIfNecessary(trackIndex); sendRtspRequest("SETUP",baseUrl,{"Transport", StrPrinter << "RTP/AVP;unicast;client_port=" << _apRtpSock[trackIndex]->get_local_port() << "-" @@ -280,7 +296,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) } auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos){ + if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ _eType = Rtsp::RTP_TCP; }else if(strTransport.find("multicast") != string::npos){ _eType = Rtsp::RTP_MULTICAST; @@ -314,7 +330,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data()); } } else { - //udp单播 + createUdpSockIfNecessary(uiTrackIndex); + //udp单播 struct sockaddr_in rtpto; rtpto.sin_port = ntohs(rtp_port); rtpto.sin_family = AF_INET; diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 68235d18..4023548b 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -93,6 +93,11 @@ protected: * @param uiLen */ virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); + + /////////////TcpClient override///////////// + void onConnect(const SockException &err) override; + void onRecv(const Buffer::Ptr &pBuf) override; + void onErr(const SockException &ex) override; private: void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); void onPlayResult_l(const SockException &ex); @@ -102,10 +107,6 @@ private: int getTrackIndexByTrackType(TrackType trackType) const; void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); - void onConnect(const SockException &err) override; - void onRecv(const Buffer::Ptr &pBuf) override; - void onErr(const SockException &ex) override; - void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); @@ -120,6 +121,7 @@ private: void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header); void sendReceiverReport(bool overTcp,int iTrackIndex); + void createUdpSockIfNecessary(int track_idx); private: string _strUrl; SdpParser _sdpParser; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 6032c4e0..bb5f9431 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -242,6 +242,19 @@ bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { return false; } +//有必要的情况下创建udp端口 +void RtspPusher::createUdpSockIfNecessary(int track_idx){ + auto &rtpSockRef = _apUdpSock[track_idx]; + if(!rtpSockRef){ + rtpSockRef.reset(new Socket(getPoller())); + //rtp随机端口 + if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) { + rtpSockRef.reset(); + throw std::runtime_error("open rtp sock failed"); + } + } +} + void RtspPusher::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex); auto &track = _aTrackInfo[trackIndex]; @@ -252,11 +265,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - _apUdpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { - _apUdpSock[trackIndex].reset(); - throw std::runtime_error("open udp sock err"); - } + createUdpSockIfNecessary(trackIndex); int port = _apUdpSock[trackIndex]->get_local_port(); sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } @@ -279,7 +288,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) } auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos){ + if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ _eType = Rtsp::RTP_TCP; string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data()); @@ -287,19 +296,15 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) throw std::runtime_error("SETUP rtsp pusher can not support multicast!"); }else{ _eType = Rtsp::RTP_UDP; + createUdpSockIfNecessary(uiTrackIndex); const char *strPos = "server_port=" ; auto port_str = FindField((strTransport + ";").data(), strPos, ";"); uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); - auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; - if(!pUdpSockRef){ - pUdpSockRef.reset(new Socket(getPoller())); - } - struct sockaddr_in rtpto; rtpto.sin_port = ntohs(port); rtpto.sin_family = AF_INET; rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); - pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); + _apUdpSock[uiTrackIndex]->setSendPeerAddr((struct sockaddr *)&(rtpto)); } RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP); diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index 0d1c5890..b01af46f 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -65,6 +65,8 @@ private: void sendRtpPacket(const RtpPacket::Ptr & pkt) ; void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); + + void createUdpSockIfNecessary(int track_idx); private: //rtsp鉴权相关 string _rtspMd5Nonce; diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index c37b74f0..3aec0802 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -29,10 +29,8 @@ #include "Rtmp/amf.h" #include "RtspMediaSource.h" -#include "MediaFile/MediaRecorder.h" -#include "Rtmp/RtmpMediaSource.h" #include "RtspDemuxer.h" -#include "Rtmp/RtmpMediaSourceMuxer.h" +#include "Common/MultiMediaSourceMuxer.h" using namespace toolkit; @@ -46,34 +44,37 @@ public: const string &app, const string &id, bool bEnableHls = true, - //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 + //chenxiaolei 修改为int, 录像最大录制天数,0就是不录 int bRecordMp4 = 0, int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) { - _recorder = std::make_shared(vhost, app, id, bEnableHls, bRecordMp4); + _bEnableHls = bEnableHls; + _bRecordMp4 = bRecordMp4; } virtual ~RtspToRtmpMediaSource() {} virtual void onGetSDP(const string &strSdp) override { - _rtspDemuxer = std::make_shared(strSdp); + _demuxer = std::make_shared(strSdp); RtspMediaSource::onGetSDP(strSdp); } virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override { - if (_rtspDemuxer) { - bKeyPos = _rtspDemuxer->inputRtp(rtp); - if (!_rtmpMuxer && _rtspDemuxer->isInited(2000)) { - _rtmpMuxer = std::make_shared(getVhost(), - getApp(), - getId(), - std::make_shared(_rtspDemuxer->getDuration())); - for (auto &track : _rtspDemuxer->getTracks(false)) { - _rtmpMuxer->addTrack(track); - _recorder->addTrack(track); - track->addDelegate(_rtmpMuxer); - track->addDelegate(_recorder); + if (_demuxer) { + bKeyPos = _demuxer->inputRtp(rtp); + if (!_muxer && _demuxer->isInited(2000)) { + _muxer = std::make_shared(getVhost(), + getApp(), + getId(), + _demuxer->getDuration(), + false,//不重复生成rtsp + true,//转rtmp + _bEnableHls, + _bRecordMp4); + for (auto &track : _demuxer->getTracks(false)) { + _muxer->addTrack(track); + track->addDelegate(_muxer); } - _rtmpMuxer->setListener(_listener); + _muxer->setListener(_listener); } } RtspMediaSource::onWrite(rtp, bKeyPos); @@ -81,17 +82,18 @@ public: void setListener(const std::weak_ptr &listener) override { RtspMediaSource::setListener(listener); - if(_rtmpMuxer){ - _rtmpMuxer->setListener(listener); + if(_muxer){ + _muxer->setListener(listener); } } int readerCount() override { - return RtspMediaSource::readerCount() + (_rtmpMuxer ? _rtmpMuxer->readerCount() : 0); + return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); } private: - RtspDemuxer::Ptr _rtspDemuxer; - RtmpMediaSourceMuxer::Ptr _rtmpMuxer; - MediaRecorder::Ptr _recorder; + RtspDemuxer::Ptr _demuxer; + MultiMediaSourceMuxer::Ptr _muxer; + bool _bEnableHls; + bool _bRecordMp4; }; } /* namespace mediakit */ diff --git a/tests/test_httpApi.cpp b/tests/test_httpApi.cpp index 7f4cd525..bf8ff4d3 100644 --- a/tests/test_httpApi.cpp +++ b/tests/test_httpApi.cpp @@ -56,49 +56,50 @@ onceToken token1([](){ }//namespace Http } // namespace mediakit +void initEventListener(){ + static onceToken s_token([](){ + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){ + //const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed + if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){ + return; + } + //url以"/api/起始,说明是http api" + consumed = true;//该http请求已被消费 -static onceToken s_token([](){ - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){ - //const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed - if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){ - return; - } - //url以"/api/起始,说明是http api" - consumed = true;//该http请求已被消费 + _StrPrinter printer; + ////////////////method//////////////////// + printer << "\r\nmethod:\r\n\t" << parser.Method(); + ////////////////url///////////////// + printer << "\r\nurl:\r\n\t" << parser.Url(); + ////////////////protocol///////////////// + printer << "\r\nprotocol:\r\n\t" << parser.Tail(); + ///////////////args////////////////// + printer << "\r\nargs:\r\n"; + for(auto &pr : parser.getUrlArgs()){ + printer << "\t" << pr.first << " : " << pr.second << "\r\n"; + } + ///////////////header////////////////// + printer << "\r\nheader:\r\n"; + for(auto &pr : parser.getValues()){ + printer << "\t" << pr.first << " : " << pr.second << "\r\n"; + } + ////////////////content///////////////// + printer << "\r\ncontent:\r\n" << parser.Content(); + auto contentOut = printer << endl; - _StrPrinter printer; - ////////////////method//////////////////// - printer << "\r\nmethod:\r\n\t" << parser.Method(); - ////////////////url///////////////// - printer << "\r\nurl:\r\n\t" << parser.Url(); - ////////////////protocol///////////////// - printer << "\r\nprotocol:\r\n\t" << parser.Tail(); - ///////////////args////////////////// - printer << "\r\nargs:\r\n"; - for(auto &pr : parser.getUrlArgs()){ - printer << "\t" << pr.first << " : " << pr.second << "\r\n"; - } - ///////////////header////////////////// - printer << "\r\nheader:\r\n"; - for(auto &pr : parser.getValues()){ - printer << "\t" << pr.first << " : " << pr.second << "\r\n"; - } - ////////////////content///////////////// - printer << "\r\ncontent:\r\n" << parser.Content(); - auto contentOut = printer << endl; - - ////////////////我们测算异步回复,当然你也可以同步回复///////////////// - EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){ - HttpSession::KeyValue headerOut; - //你可以自定义header,如果跟默认header重名,则会覆盖之 - //默认header有:Server,Connection,Date,Content-Type,Content-Length - //请勿覆盖Connection、Content-Length键 - //键名覆盖时不区分大小写 - headerOut["TestHeader"] = "HeaderValue"; - invoker("200 OK",headerOut,contentOut); - }); - }); -}, nullptr); + ////////////////我们测算异步回复,当然你也可以同步回复///////////////// + EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){ + HttpSession::KeyValue headerOut; + //你可以自定义header,如果跟默认header重名,则会覆盖之 + //默认header有:Server,Connection,Date,Content-Type,Content-Length + //请勿覆盖Connection、Content-Length键 + //键名覆盖时不区分大小写 + headerOut["TestHeader"] = "HeaderValue"; + invoker("200 OK",headerOut,contentOut); + }); + }); + }, nullptr); +} int main(int argc,char *argv[]){ //设置退出信号处理函数 @@ -111,6 +112,7 @@ int main(int argc,char *argv[]){ //加载配置文件,如果配置文件不存在就创建一个 loadIniConfig(); + initEventListener(); //加载证书,证书包含公钥和私钥 SSL_Initor::Instance().loadCertificate((exeDir() + "ssl.p12").data()); diff --git a/tests/test_player.cpp b/tests/test_player.cpp index aeecf75d..62852565 100644 --- a/tests/test_player.cpp +++ b/tests/test_player.cpp @@ -25,6 +25,7 @@ */ #include #include +#include "Util/util.h" #include "Util/logger.h" #include #include "Poller/EventPoller.h" @@ -68,22 +69,27 @@ int main(int argc, char *argv[]) { WarnL << "没有视频或者视频不是264编码!"; return; } - SDLDisplayerHelper::Instance().doTask([viedoTrack]() { - std::shared_ptr decoder(new H264Decoder); - std::shared_ptr displayer(new YuvDisplayer); - viedoTrack->addDelegate(std::make_shared([decoder, displayer](const Frame::Ptr &frame) { - SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() { - AVFrame *pFrame = nullptr; - bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(), - frame->stamp(), &pFrame); - if (flag) { - displayer->displayYUV(pFrame); - } - return true; - }); - })); - return true; - }); + + AnyStorage::Ptr storage(new AnyStorage); + viedoTrack->addDelegate(std::make_shared([storage](const Frame::Ptr &frame) { + SDLDisplayerHelper::Instance().doTask([frame,storage]() { + auto &decoder = (*storage)["decoder"]; + auto &displayer = (*storage)["displayer"]; + if(!decoder){ + decoder.set(); + } + if(!displayer){ + displayer.set(); + } + + AVFrame *pFrame = nullptr; + bool flag = decoder.get().inputVideo((unsigned char *) frame->data(), frame->size(), frame->stamp(), &pFrame); + if (flag) { + displayer.get().displayYUV(pFrame); + } + return true; + }); + })); }); diff --git a/tests/test_pusher.cpp b/tests/test_pusher.cpp index 36833ca5..c7a56aa0 100644 --- a/tests/test_pusher.cpp +++ b/tests/test_pusher.cpp @@ -90,7 +90,7 @@ int domain(const string &playUrl, const string &pushUrl) { //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller)); + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",true,true,false,false,-1 , poller)); //可以指定rtsp拉流方式,支持tcp和udp方式,默认tcp // (*player)[Client::kRtpType] = Rtsp::RTP_UDP; player->play(playUrl.data());