diff --git a/ZLToolKit b/ZLToolKit index d3a1bd9d..98d824bd 160000 --- a/ZLToolKit +++ b/ZLToolKit @@ -1 +1 @@ -Subproject commit d3a1bd9d8a3162d0237954034d85f1313008cab2 +Subproject commit 98d824bdc6604b4e761657922c913ea46bd90223 diff --git a/src/Rtsp/UDPServer.cpp b/src/Rtsp/UDPServer.cpp index d41490f4..ec7454d3 100644 --- a/src/Rtsp/UDPServer.cpp +++ b/src/Rtsp/UDPServer.cpp @@ -26,18 +26,14 @@ #include "UDPServer.h" #include "Util/TimeTicker.h" +#include "Util/onceToken.h" + using namespace toolkit; namespace mediakit { -UDPServer &UDPServer::Instance() { - static UDPServer *instance(new UDPServer()); - return *instance; -} -void UDPServer::Destory() { - delete &UDPServer::Instance(); -} - +INSTANCE_IMP(UDPServer); + UDPServer::UDPServer() { } diff --git a/src/Rtsp/UDPServer.h b/src/Rtsp/UDPServer.h index 67b998d5..60aa7343 100644 --- a/src/Rtsp/UDPServer.h +++ b/src/Rtsp/UDPServer.h @@ -27,8 +27,9 @@ #ifndef RTSP_UDPSERVER_H_ #define RTSP_UDPSERVER_H_ -#include #include +#include +#include #include #include #include "Util/util.h" @@ -40,18 +41,23 @@ using namespace toolkit; namespace mediakit { -class UDPServer { +class UDPServer : public std::enable_shared_from_this { public: typedef function< bool(int, const Buffer::Ptr &, struct sockaddr *)> onRecvData; - UDPServer(); - virtual ~UDPServer(); + ~UDPServer(); static UDPServer &Instance(); - static void Destory(); + + /** + * 废弃的接口,无实际操作 + * @deprecated + */ + static void Destory(){}; Socket::Ptr getSock(const char *strLocalIp, int iTrackIndex,uint16_t iLocalPort = 0); void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb); void stopListenPeer(const char *strPeerIp, void *pSelf); private: + UDPServer(); void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr); void onErr(const string &strKey,const SockException &err); unordered_map _mapUpdSock; diff --git a/src/RtspMuxer/H264RtpCodec.cpp b/src/RtspMuxer/H264RtpCodec.cpp index 8c35eed3..8d83c93c 100644 --- a/src/RtspMuxer/H264RtpCodec.cpp +++ b/src/RtspMuxer/H264RtpCodec.cpp @@ -142,7 +142,7 @@ bool H264RtpDecoder::decodeRtp(const RtpPacket::Ptr &rtppack) { return false; } - WarnL << "不支持的rtp类型:" << nal.type << " " << rtppack->sequence; + WarnL << "不支持的rtp类型:" << (int)nal.type << " " << rtppack->sequence; return false; // 29 FU-B 单NAL单元B模式 // 24 STAP-A 单一时间的组合包 diff --git a/tests/test_benchmark.cpp b/tests/test_benchmark.cpp index 5d99043f..82c65647 100644 --- a/tests/test_benchmark.cpp +++ b/tests/test_benchmark.cpp @@ -41,9 +41,10 @@ using namespace mediakit; int main(int argc, char *argv[]) { //设置退出信号处理函数 - signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); + signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); }); + //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); if (argc != 5) { @@ -55,42 +56,33 @@ int main(int argc, char *argv[]) { return 0; } - { - list playerList; - auto playerCnt = atoi(argv[1]);//启动的播放器个数 - atomic_int alivePlayerCnt(0); - //每隔若干毫秒启动一个播放器(如果一次性全部启动,服务器和客户端可能都承受不了) - AsyncTaskThread::Instance().DoTaskDelay(0, atoi(argv[2]), [&]() { - MediaPlayer::Ptr player(new MediaPlayer()); - player->setOnPlayResult([&](const SockException &ex) { - if (!ex) { - ++alivePlayerCnt; - } - }); - player->setOnShutdown([&](const SockException &ex) { - --alivePlayerCnt; - }); - (*player)[RtspPlayer::kRtpType] = atoi(argv[4]); - player->play(argv[3]); - playerList.push_back(player); - return playerCnt--; + list playerList; + auto playerCnt = atoi(argv[1]);//启动的播放器个数 + atomic_int alivePlayerCnt(0); + //每隔若干毫秒启动一个播放器(如果一次性全部启动,服务器和客户端可能都承受不了) + AsyncTaskThread::Instance().DoTaskDelay(0, atoi(argv[2]), [&]() { + MediaPlayer::Ptr player(new MediaPlayer()); + player->setOnPlayResult([&](const SockException &ex) { + if (!ex) { + ++alivePlayerCnt; + } }); - - AsyncTaskThread::Instance().DoTaskDelay(0, 1000, [&]() { - InfoL << "存活播放器个数:" << alivePlayerCnt.load(); - return true; + player->setOnShutdown([&](const SockException &ex) { + --alivePlayerCnt; }); - EventPoller::Instance().runLoop(); - AsyncTaskThread::Instance().CancelTask(0); - } - - static onceToken token(nullptr, []() { - WorkThreadPool::Instance(); - UDPServer::Destory(); - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + (*player)[RtspPlayer::kRtpType] = atoi(argv[4]); + player->play(argv[3]); + playerList.push_back(player); + return playerCnt--; }); + + AsyncTaskThread::Instance().DoTaskDelay(0, 1000, [&]() { + InfoL << "存活播放器个数:" << alivePlayerCnt.load(); + return true; + }); + + EventPollerPool::Instance().wait(); + AsyncTaskThread::Instance().CancelTask(0); return 0; } diff --git a/tests/test_httpApi.cpp b/tests/test_httpApi.cpp index f1c546af..b4887f42 100644 --- a/tests/test_httpApi.cpp +++ b/tests/test_httpApi.cpp @@ -102,10 +102,11 @@ static onceToken s_token([](){ int main(int argc,char *argv[]){ //设置退出信号处理函数 - signal(SIGINT, [](int){EventPoller::Instance().shutdown();}); + signal(SIGINT, [](int){EventPollerPool::Instance().shutdown();}); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); - Logger::Instance().setWriter(std::make_shared()); + Logger::Instance().add(std::make_shared()); + Logger::Instance().setWriter(std::make_shared()); + //加载配置文件,如果配置文件不存在就创建一个 loadIniConfig(); @@ -132,14 +133,7 @@ int main(int argc,char *argv[]){ InfoL << "你可以在浏览器输入:http://127.0.0.1/api/my_api?key0=val0&key1=参数1" << endl; - EventPoller::Instance().runLoop(); - - static onceToken s_token(nullptr,[]() { - //TcpServer用到了WorkThreadPool - WorkThreadPool::Destory(); - EventPoller::Destory(); - Logger::Destory(); - }); + EventPollerPool::Instance().wait(); return 0; } diff --git a/tests/test_httpClient.cpp b/tests/test_httpClient.cpp index 9c08932d..7acb9f68 100644 --- a/tests/test_httpClient.cpp +++ b/tests/test_httpClient.cpp @@ -41,93 +41,132 @@ using namespace mediakit; int main(int argc, char *argv[]) { //设置退出信号处理函数 - signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); + signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); }); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); - { - ///////////////////////////////http downloader/////////////////////// - //下载器map - map downloaderMap; - //下载两个文件,一个是http下载,一个https下载 - auto urlList = {"https://timgsa.baidu.com/timg?image&quality=80&" - "size=b9999_10000&sec=1537717640404&" - "di=f602efbebbc1e7f6b9ccb0bf0def89d0&" - "imgtype=0&" - "src=http%3A%2F%2Fimgsrc.baidu.com%2Fimgad%2Fpic%2Fitem%2F241f95cad1c8a786ff65052a6d09c93d70cf5042.jpg",}; - for (auto &url : urlList) { - //创建下载器 - HttpDownloader::Ptr downloader(new HttpDownloader()); - downloader->setOnResult([](ErrCode code, const char *errMsg, const char *filePath) { - DebugL << "=====================HttpDownloader result======================="; - //下载结果回调 - if (code == Err_success) { - //文件下载成功 - InfoL << "download file success:" << filePath; - } else { - //下载失败 - WarnL << "code:" << code << " msg:" << errMsg; - } - }); - //断点续传功能,开启后可能会遇到416的错误(因为上次文件已经下载完全) - downloader->startDownload(url, exeDir() + MD5(url).hexdigest() + ".jpg", true); - //下载器必须被强引用,否则作用域一失效就会导致对象销毁 - downloaderMap.emplace(url, downloader); - } + ///////////////////////////////http downloader/////////////////////// + //下载器map + map downloaderMap; + //下载两个文件,一个是http下载,一个https下载 + auto urlList = {"https://timgsa.baidu.com/timg?image&quality=80&" + "size=b9999_10000&sec=1537717640404&" + "di=f602efbebbc1e7f6b9ccb0bf0def89d0&" + "imgtype=0&" + "src=http%3A%2F%2Fimgsrc.baidu.com%2Fimgad%2Fpic%2Fitem%2F241f95cad1c8a786ff65052a6d09c93d70cf5042.jpg",}; - ///////////////////////////////http get/////////////////////// - //创建一个Http请求器 - HttpRequester::Ptr requesterGet(new HttpRequester()); - //使用GET方式请求 - requesterGet->setMethod("GET"); - //设置http请求头,我们假设设置cookie,当然你也可以设置其他http头 - requesterGet->addHeader("Cookie", "SESSIONID=e1aa89b3-f79f-4ac6-8ae2-0cea9ae8e2d7"); - //开启请求,该api会返回当前主机外网ip等信息 - requesterGet->startRequester("http://pv.sohu.com/cityjson?ie=utf-8",//url地址 - [](const SockException &ex, //网络相关的失败信息,如果为空就代表成功 - const string &status, //http回复的状态码,比如说200/404 - const HttpClient::HttpHeader &header, //http回复头 - const string &strRecvBody) { //http回复body - DebugL << "=====================HttpRequester GET==========================="; - if (ex) { - //网络相关的错误 - WarnL << "network err:" << ex.getErrCode() << " " << ex.what(); - } else { - //打印http回复信息 - _StrPrinter printer; - for (auto &pr: header) { - printer << pr.first << ":" << pr.second << "\r\n"; - } - InfoL << "status:" << status << "\r\n" - << "header:\r\n" << (printer << endl) - << "\r\nbody:" << strRecvBody; + for (auto &url : urlList) { + //创建下载器 + HttpDownloader::Ptr downloader(new HttpDownloader()); + downloader->setOnResult([](ErrCode code, const char *errMsg, const char *filePath) { + DebugL << "=====================HttpDownloader result======================="; + //下载结果回调 + if (code == Err_success) { + //文件下载成功 + InfoL << "download file success:" << filePath; + } else { + //下载失败 + WarnL << "code:" << code << " msg:" << errMsg; + } + }); + //断点续传功能,开启后可能会遇到416的错误(因为上次文件已经下载完全) + downloader->startDownload(url, exeDir() + MD5(url).hexdigest() + ".jpg", true); + //下载器必须被强引用,否则作用域一失效就会导致对象销毁 + downloaderMap.emplace(url, downloader); + } + + ///////////////////////////////http get/////////////////////// + //创建一个Http请求器 + HttpRequester::Ptr requesterGet(new HttpRequester()); + //使用GET方式请求 + requesterGet->setMethod("GET"); + //设置http请求头,我们假设设置cookie,当然你也可以设置其他http头 + requesterGet->addHeader("Cookie", "SESSIONID=e1aa89b3-f79f-4ac6-8ae2-0cea9ae8e2d7"); + //开启请求,该api会返回当前主机外网ip等信息 + requesterGet->startRequester("http://pv.sohu.com/cityjson?ie=utf-8",//url地址 + [](const SockException &ex, //网络相关的失败信息,如果为空就代表成功 + const string &status, //http回复的状态码,比如说200/404 + const HttpClient::HttpHeader &header, //http回复头 + const string &strRecvBody) { //http回复body + DebugL << "=====================HttpRequester GET==========================="; + if (ex) { + //网络相关的错误 + WarnL << "network err:" << ex.getErrCode() << " " << ex.what(); + } else { + //打印http回复信息 + _StrPrinter printer; + for (auto &pr: header) { + printer << pr.first << ":" << pr.second << "\r\n"; } - }); + InfoL << "status:" << status << "\r\n" + << "header:\r\n" << (printer << endl) + << "\r\nbody:" << strRecvBody; + } + }); - ///////////////////////////////http post/////////////////////// - //创建一个Http请求器 - HttpRequester::Ptr requesterPost(new HttpRequester()); - //使用POST方式请求 - requesterPost->setMethod("POST"); - //设置http请求头 - requesterPost->addHeader("X-Requested-With", "XMLHttpRequest"); - requesterPost->addHeader("Origin", "http://fanyi.baidu.com"); - //设置POST参数列表 - HttpArgs args; - args["query"] = "test"; - args["from"] = "en"; - args["to"] = "zh"; - args["transtype"] = "translang"; - args["simple_means_flag"] = "3"; - requesterPost->setBody(args.make()); - //开启请求 - requesterPost->startRequester("http://fanyi.baidu.com/langdetect",//url地址 + ///////////////////////////////http post/////////////////////// + //创建一个Http请求器 + HttpRequester::Ptr requesterPost(new HttpRequester()); + //使用POST方式请求 + requesterPost->setMethod("POST"); + //设置http请求头 + requesterPost->addHeader("X-Requested-With", "XMLHttpRequest"); + requesterPost->addHeader("Origin", "http://fanyi.baidu.com"); + //设置POST参数列表 + HttpArgs args; + args["query"] = "test"; + args["from"] = "en"; + args["to"] = "zh"; + args["transtype"] = "translang"; + args["simple_means_flag"] = "3"; + requesterPost->setBody(args.make()); + //开启请求 + requesterPost->startRequester("http://fanyi.baidu.com/langdetect",//url地址 + [](const SockException &ex, //网络相关的失败信息,如果为空就代表成功 + const string &status, //http回复的状态码,比如说200/404 + const HttpClient::HttpHeader &header, //http回复头 + const string &strRecvBody) { //http回复body + DebugL << "=====================HttpRequester POST=========================="; + if (ex) { + //网络相关的错误 + WarnL << "network err:" << ex.getErrCode() << " " << ex.what(); + } else { + //打印http回复信息 + _StrPrinter printer; + for (auto &pr: header) { + printer << pr.first << ":" << pr.second << "\r\n"; + } + InfoL << "status:" << status << "\r\n" + << "header:\r\n" << (printer << endl) + << "\r\nbody:" << strRecvBody; + } + }); + + ///////////////////////////////http upload/////////////////////// + //创建一个Http请求器 + HttpRequester::Ptr requesterUploader(new HttpRequester()); + //使用POST方式请求 + requesterUploader->setMethod("POST"); + //设置http请求头 + HttpArgs argsUploader; + argsUploader["query"] = "test"; + argsUploader["from"] = "en"; + argsUploader["to"] = "zh"; + argsUploader["transtype"] = "translang"; + argsUploader["simple_means_flag"] = "3"; + + static string boundary = "0xKhTmLbOuNdArY"; + HttpMultiFormBody::Ptr body(new HttpMultiFormBody(argsUploader, exePath(), boundary)); + requesterUploader->setBody(body); + requesterUploader->addHeader("Content-Type", HttpMultiFormBody::multiFormContentType(boundary)); + //开启请求 + requesterUploader->startRequester("http://fanyi.baidu.com/langdetect",//url地址 [](const SockException &ex, //网络相关的失败信息,如果为空就代表成功 const string &status, //http回复的状态码,比如说200/404 const HttpClient::HttpHeader &header, //http回复头 const string &strRecvBody) { //http回复body - DebugL << "=====================HttpRequester POST=========================="; + DebugL << "=====================HttpRequester Uploader=========================="; if (ex) { //网络相关的错误 WarnL << "network err:" << ex.getErrCode() << " " << ex.what(); @@ -143,52 +182,8 @@ int main(int argc, char *argv[]) { } }); - ///////////////////////////////http upload/////////////////////// - //创建一个Http请求器 - HttpRequester::Ptr requesterUploader(new HttpRequester()); - //使用POST方式请求 - requesterUploader->setMethod("POST"); - //设置http请求头 - HttpArgs argsUploader; - argsUploader["query"] = "test"; - argsUploader["from"] = "en"; - argsUploader["to"] = "zh"; - argsUploader["transtype"] = "translang"; - argsUploader["simple_means_flag"] = "3"; - - static string boundary = "0xKhTmLbOuNdArY"; - HttpMultiFormBody::Ptr body(new HttpMultiFormBody(argsUploader, exePath(), boundary)); - requesterUploader->setBody(body); - requesterUploader->addHeader("Content-Type", HttpMultiFormBody::multiFormContentType(boundary)); - //开启请求 - requesterUploader->startRequester("http://fanyi.baidu.com/langdetect",//url地址 - [](const SockException &ex, //网络相关的失败信息,如果为空就代表成功 - const string &status, //http回复的状态码,比如说200/404 - const HttpClient::HttpHeader &header, //http回复头 - const string &strRecvBody) { //http回复body - DebugL << "=====================HttpRequester Uploader=========================="; - if (ex) { - //网络相关的错误 - WarnL << "network err:" << ex.getErrCode() << " " << ex.what(); - } else { - //打印http回复信息 - _StrPrinter printer; - for (auto &pr: header) { - printer << pr.first << ":" << pr.second << "\r\n"; - } - InfoL << "status:" << status << "\r\n" - << "header:\r\n" << (printer << endl) - << "\r\nbody:" << strRecvBody; - } - }); - - //事件轮询 - EventPoller::Instance().runLoop(); - } - //程序开始退出 - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + //事件轮询 + EventPollerPool::Instance().wait(); return 0; } diff --git a/tests/test_player.cpp b/tests/test_player.cpp index bb10dfe1..7a9858c6 100644 --- a/tests/test_player.cpp +++ b/tests/test_player.cpp @@ -43,7 +43,8 @@ int main(int argc, char *argv[]) { //设置退出信号处理函数 signal(SIGINT, [](int) { SDLDisplayerHelper::Instance().shutdown(); }); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); + Logger::Instance().setWriter(std::make_shared()); if (argc != 3) { ErrorL << "\r\n测试方法:./test_player rtxp_url rtp_type\r\n" @@ -54,51 +55,46 @@ int main(int argc, char *argv[]) { } - { - MediaPlayer::Ptr player(new MediaPlayer()); - weak_ptr weakPlayer = player; - player->setOnPlayResult([weakPlayer](const SockException &ex) { - InfoL << "OnPlayResult:" << ex.what(); - auto strongPlayer = weakPlayer.lock(); - if (ex || !strongPlayer) { - return; - } + MediaPlayer::Ptr player(new MediaPlayer()); + weak_ptr weakPlayer = player; + player->setOnPlayResult([weakPlayer](const SockException &ex) { + InfoL << "OnPlayResult:" << ex.what(); + auto strongPlayer = weakPlayer.lock(); + if (ex || !strongPlayer) { + return; + } - auto viedoTrack = strongPlayer->getTrack(TrackVideo); - if (!viedoTrack || viedoTrack->getCodecId() != CodecH264) { - WarnL << "没有视频或者视频不是264编码!"; - return; - } - SDLDisplayerHelper::Instance().doTask([viedoTrack]() { - std::shared_ptr decoder(new H264Decoder); - std::shared_ptr displayer(new YuvDisplayer); - viedoTrack->addDelegate(std::make_shared([decoder, displayer](const Frame::Ptr &frame) { - SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() { - AVFrame *pFrame = nullptr; - bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(), - frame->stamp(), &pFrame); - if (flag) { - displayer->displayYUV(pFrame); - } - return true; - }); - })); - return true; - }); + auto viedoTrack = strongPlayer->getTrack(TrackVideo); + if (!viedoTrack || viedoTrack->getCodecId() != CodecH264) { + WarnL << "没有视频或者视频不是264编码!"; + return; + } + SDLDisplayerHelper::Instance().doTask([viedoTrack]() { + std::shared_ptr decoder(new H264Decoder); + std::shared_ptr displayer(new YuvDisplayer); + viedoTrack->addDelegate(std::make_shared([decoder, displayer](const Frame::Ptr &frame) { + SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() { + AVFrame *pFrame = nullptr; + bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(), + frame->stamp(), &pFrame); + if (flag) { + displayer->displayYUV(pFrame); + } + return true; + }); + })); + return true; }); + }); - player->setOnShutdown([](const SockException &ex) { - ErrorL << "OnShutdown:" << ex.what(); - }); - (*player)[RtspPlayer::kRtpType] = atoi(argv[2]); - player->play(argv[1]); - SDLDisplayerHelper::Instance().runLoop(); - } - UDPServer::Destory(); - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + player->setOnShutdown([](const SockException &ex) { + ErrorL << "OnShutdown:" << ex.what(); + }); + (*player)[RtspPlayer::kRtpType] = atoi(argv[2]); + player->play(argv[1]); + + SDLDisplayerHelper::Instance().runLoop(); return 0; } diff --git a/tests/test_rtmpPusher.cpp b/tests/test_rtmpPusher.cpp index 1f0efe91..fa576419 100644 --- a/tests/test_rtmpPusher.cpp +++ b/tests/test_rtmpPusher.cpp @@ -83,37 +83,26 @@ void rePushDelay(const string &app, const string &stream, const string &url) { //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 int domain(const string &playUrl, const string &pushUrl) { //设置退出信号处理函数 - signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); + signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); }); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); - { - //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" - //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream")); - player->play(playUrl.data()); + //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" + //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream")); + player->play(playUrl.data()); - //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发 - NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, - [pushUrl](BroadcastMediaChangedArgs) { - //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 - if(bRegist && schema == RTMP_SCHEMA){ - createPusher(app, stream, pushUrl); - } - }); - - //事件轮询 - EventPoller::Instance().runLoop(); - pusher.reset(); - } - //删除事件监听 - NoticeCenter::Instance().delListener(nullptr); - - //清理程序 - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, + [pushUrl](BroadcastMediaChangedArgs) { + //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源 + if(bRegist && schema == RTMP_SCHEMA){ + createPusher(app, stream, pushUrl); + } + }); + //事件轮询 + EventPollerPool::Instance().wait(); return 0; } diff --git a/tests/test_rtmpPusherMp4.cpp b/tests/test_rtmpPusherMp4.cpp index cd789f6f..e1d89c35 100644 --- a/tests/test_rtmpPusherMp4.cpp +++ b/tests/test_rtmpPusherMp4.cpp @@ -92,9 +92,9 @@ void rePushDelay(const string &app,const string &stream,const string &url){ //这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了 int domain(const string & filePath,const string & pushUrl){ //设置退出信号处理函数 - signal(SIGINT, [](int){EventPoller::Instance().shutdown();}); + signal(SIGINT, [](int){EventPollerPool::Instance().shutdown();}); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); //录像应用名称默认为record @@ -104,16 +104,7 @@ int domain(const string & filePath,const string & pushUrl){ createPusher(appName,filePath,pushUrl); //开始事件轮询 - EventPoller::Instance().runLoop(); - //删除事件监听 - NoticeCenter::Instance().delListener(nullptr); - //销毁推流器 - pusher.reset(); - - //程序清理 - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + EventPollerPool::Instance().wait(); return 0; } diff --git a/tests/test_server.cpp b/tests/test_server.cpp index d1eea4bb..89cc8d02 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -229,141 +229,132 @@ static onceToken s_token([](){ int main(int argc,char *argv[]) { //设置退出信号处理函数 - signal(SIGINT, [](int) { EventPoller::Instance().shutdown(); }); + signal(SIGINT, [](int) { EventPollerPool::Instance().shutdown(); }); signal(SIGHUP, [](int) { loadIniConfig(); }); //设置日志 - Logger::Instance().add(std::make_shared("stdout", LTrace)); + Logger::Instance().add(std::make_shared()); + Logger::Instance().add(std::make_shared()); Logger::Instance().setWriter(std::make_shared()); //加载配置文件,如果配置文件不存在就创建一个 loadIniConfig(); - { - //这里是拉流地址,支持rtmp/rtsp协议,负载必须是H264+AAC - //如果是其他不识别的音视频将会被忽略(譬如说h264+adpcm转发后会去除音频) - auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks1", - "rtmp://live.hkstv.hk.lxdns.com/live/hks2" - //rtsp链接支持输入用户名密码 - /*"rtsp://admin:jzan123456@192.168.0.122/"*/}; - map proxyMap; - int i = 0; - for (auto &url : urlList) { - //PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId) - //比如说应用为live,流id为0,那么直播地址为: - //hls地址 : http://127.0.0.1/live/0/hls.m3u8 - //http-flv地址 : http://127.0.0.1/live/0.flv - //rtsp地址 : rtsp://127.0.0.1/live/0 - //rtmp地址 : rtmp://127.0.0.1/live/0 + //这里是拉流地址,支持rtmp/rtsp协议,负载必须是H264+AAC + //如果是其他不识别的音视频将会被忽略(譬如说h264+adpcm转发后会去除音频) + auto urlList = {"rtmp://live.hkstv.hk.lxdns.com/live/hks1", + "rtmp://live.hkstv.hk.lxdns.com/live/hks2" + //rtsp链接支持输入用户名密码 + /*"rtsp://admin:jzan123456@192.168.0.122/"*/}; + map proxyMap; + int i = 0; + for (auto &url : urlList) { + //PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId) + //比如说应用为live,流id为0,那么直播地址为: - //录像地址为(当然vlc不支持这么多级的rtmp url,可以用test_player测试rtmp点播): - //http://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 - //rtsp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 - //rtmp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 + //hls地址 : http://127.0.0.1/live/0/hls.m3u8 + //http-flv地址 : http://127.0.0.1/live/0.flv + //rtsp地址 : rtsp://127.0.0.1/live/0 + //rtmp地址 : rtmp://127.0.0.1/live/0 - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "live", to_string(i).data())); - //指定RTP over TCP(播放rtsp时有效) - (*player)[RtspPlayer::kRtpType] = PlayerBase::RTP_TCP; - //开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试 - player->play(url); - //需要保存PlayerProxy,否则作用域结束就会销毁该对象 - proxyMap.emplace(to_string(i), player); - ++i; + //录像地址为(当然vlc不支持这么多级的rtmp url,可以用test_player测试rtmp点播): + //http://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 + //rtsp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 + //rtmp://127.0.0.1/record/live/0/2017-04-11/11-09-38.mp4 + + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "live", to_string(i).data())); + //指定RTP over TCP(播放rtsp时有效) + (*player)[RtspPlayer::kRtpType] = PlayerBase::RTP_TCP; + //开始播放,如果播放失败或者播放中止,将会自动重试若干次,重试次数在配置文件中配置,默认一直重试 + player->play(url); + //需要保存PlayerProxy,否则作用域结束就会销毁该对象 + proxyMap.emplace(to_string(i), player); + ++i; + } + + DebugL << "\r\n" + " PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId)\n" + " 比如说应用为live,流id为0,那么直播地址为:\n" + " hls地址 : http://127.0.0.1/live/0/hls.m3u8\n" + " http-flv地址 : http://127.0.0.1/live/0.flv\n" + " rtsp地址 : rtsp://127.0.0.1/live/0\n" + " rtmp地址 : rtmp://127.0.0.1/live/0"; + + //请把证书"test_server.pem"放置在本程序可执行程序同目录下 + try { + //加载证书,证书包含公钥和私钥 + SSL_Initor::Instance().loadServerPem((exePath() + ".pem").data()); + } catch (...) { + ErrorL << "请把证书:" << (exeName() + ".pem") << "放置在本程序可执行程序同目录下:" << exeDir() << endl; + proxyMap.clear(); + return 0; + } + + uint16_t shellPort = mINI::Instance()[Shell::kPort]; + uint16_t rtspPort = mINI::Instance()[Rtsp::kPort]; + uint16_t rtspsPort = mINI::Instance()[Rtsp::kSSLPort]; + uint16_t rtmpPort = mINI::Instance()[Rtmp::kPort]; + uint16_t httpPort = mINI::Instance()[Http::kPort]; + uint16_t httpsPort = mINI::Instance()[Http::kSSLPort]; + + //简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象 + //测试方法:telnet 127.0.0.1 9000 + TcpServer::Ptr shellSrv(new TcpServer()); + TcpServer::Ptr rtspSrv(new TcpServer()); + TcpServer::Ptr rtmpSrv(new TcpServer()); + TcpServer::Ptr httpSrv(new TcpServer()); + + shellSrv->start(shellPort); + rtspSrv->start(rtspPort);//默认554 + rtmpSrv->start(rtmpPort);//默认1935 + //http服务器,支持websocket + httpSrv->start(httpPort);//默认80 + + //如果支持ssl,还可以开启https服务器 + TcpServer::Ptr httpsSrv(new TcpServer()); + //https服务器,支持websocket + httpsSrv->start(httpsPort);//默认443 + + //支持ssl加密的rtsp服务器,可用于诸如亚马逊echo show这样的设备访问 + TcpServer::Ptr rtspSSLSrv(new TcpServer()); + rtspSSLSrv->start(rtspsPort);//默认322 + + //服务器支持动态切换端口(不影响现有连接) + NoticeCenter::Instance().addListener(ReloadConfigTag,Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){ + //重新创建服务器 + if(shellPort != mINI::Instance()[Shell::kPort].as()){ + shellPort = mINI::Instance()[Shell::kPort]; + shellSrv->start(shellPort); + InfoL << "重启shell服务器:" << shellPort; + } + if(rtspPort != mINI::Instance()[Rtsp::kPort].as()){ + rtspPort = mINI::Instance()[Rtsp::kPort]; + rtspSrv->start(rtspPort); + InfoL << "重启rtsp服务器" << rtspPort; + } + if(rtmpPort != mINI::Instance()[Rtmp::kPort].as()){ + rtmpPort = mINI::Instance()[Rtmp::kPort]; + rtmpSrv->start(rtmpPort); + InfoL << "重启rtmp服务器" << rtmpPort; + } + if(httpPort != mINI::Instance()[Http::kPort].as()){ + httpPort = mINI::Instance()[Http::kPort]; + httpSrv->start(httpPort); + InfoL << "重启http服务器" << httpPort; + } + if(httpsPort != mINI::Instance()[Http::kSSLPort].as()){ + httpsPort = mINI::Instance()[Http::kSSLPort]; + httpsSrv->start(httpsPort); + InfoL << "重启https服务器" << httpsPort; } - DebugL << "\r\n" - " PlayerProxy构造函数前两个参数分别为应用名(app),流id(streamId)\n" - " 比如说应用为live,流id为0,那么直播地址为:\n" - " hls地址 : http://127.0.0.1/live/0/hls.m3u8\n" - " http-flv地址 : http://127.0.0.1/live/0.flv\n" - " rtsp地址 : rtsp://127.0.0.1/live/0\n" - " rtmp地址 : rtmp://127.0.0.1/live/0"; - - //请把证书"test_server.pem"放置在本程序可执行程序同目录下 - try { - //加载证书,证书包含公钥和私钥 - SSL_Initor::Instance().loadServerPem((exePath() + ".pem").data()); - } catch (...) { - ErrorL << "请把证书:" << (exeName() + ".pem") << "放置在本程序可执行程序同目录下:" << exeDir() << endl; - proxyMap.clear(); - return 0; + if(rtspsPort != mINI::Instance()[Rtsp::kSSLPort].as()){ + rtspsPort = mINI::Instance()[Rtsp::kSSLPort]; + rtspSSLSrv->start(rtspsPort); + InfoL << "重启rtsps服务器" << rtspsPort; } + }); - uint16_t shellPort = mINI::Instance()[Shell::kPort]; - uint16_t rtspPort = mINI::Instance()[Rtsp::kPort]; - uint16_t rtspsPort = mINI::Instance()[Rtsp::kSSLPort]; - uint16_t rtmpPort = mINI::Instance()[Rtmp::kPort]; - uint16_t httpPort = mINI::Instance()[Http::kPort]; - uint16_t httpsPort = mINI::Instance()[Http::kSSLPort]; - - //简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象 - //测试方法:telnet 127.0.0.1 9000 - TcpServer::Ptr shellSrv(new TcpServer()); - TcpServer::Ptr rtspSrv(new TcpServer()); - TcpServer::Ptr rtmpSrv(new TcpServer()); - TcpServer::Ptr httpSrv(new TcpServer()); - - shellSrv->start(shellPort); - rtspSrv->start(rtspPort);//默认554 - rtmpSrv->start(rtmpPort);//默认1935 - //http服务器,支持websocket - httpSrv->start(httpPort);//默认80 - - //如果支持ssl,还可以开启https服务器 - TcpServer::Ptr httpsSrv(new TcpServer()); - //https服务器,支持websocket - httpsSrv->start(httpsPort);//默认443 - - //支持ssl加密的rtsp服务器,可用于诸如亚马逊echo show这样的设备访问 - TcpServer::Ptr rtspSSLSrv(new TcpServer()); - rtspSSLSrv->start(rtspsPort);//默认322 - - //服务器支持动态切换端口(不影响现有连接) - NoticeCenter::Instance().addListener(ReloadConfigTag,Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){ - //重新创建服务器 - if(shellPort != mINI::Instance()[Shell::kPort].as()){ - shellPort = mINI::Instance()[Shell::kPort]; - shellSrv->start(shellPort); - InfoL << "重启shell服务器:" << shellPort; - } - if(rtspPort != mINI::Instance()[Rtsp::kPort].as()){ - rtspPort = mINI::Instance()[Rtsp::kPort]; - rtspSrv->start(rtspPort); - InfoL << "重启rtsp服务器" << rtspPort; - } - if(rtmpPort != mINI::Instance()[Rtmp::kPort].as()){ - rtmpPort = mINI::Instance()[Rtmp::kPort]; - rtmpSrv->start(rtmpPort); - InfoL << "重启rtmp服务器" << rtmpPort; - } - if(httpPort != mINI::Instance()[Http::kPort].as()){ - httpPort = mINI::Instance()[Http::kPort]; - httpSrv->start(httpPort); - InfoL << "重启http服务器" << httpPort; - } - if(httpsPort != mINI::Instance()[Http::kSSLPort].as()){ - httpsPort = mINI::Instance()[Http::kSSLPort]; - httpsSrv->start(httpsPort); - InfoL << "重启https服务器" << httpsPort; - } - - if(rtspsPort != mINI::Instance()[Rtsp::kSSLPort].as()){ - rtspsPort = mINI::Instance()[Rtsp::kSSLPort]; - rtspSSLSrv->start(rtspsPort); - InfoL << "重启rtsps服务器" << rtspsPort; - } - }); - - EventPoller::Instance().runLoop(); - }//设置作用域,作用域结束后会销毁临时变量;省去手动注销服务器 - - - //rtsp服务器用到udp端口分配器了 - UDPServer::Destory(); - //TcpServer用到了WorkThreadPool - WorkThreadPool::Destory(); - EventPoller::Destory(); - AsyncTaskThread::Destory(); - Logger::Destory(); + EventPollerPool::Instance().wait(); return 0; }