diff --git a/.gitattributes b/.gitattributes index b6649863..b7f1a8b2 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,4 +1,2 @@ -release/ filter=lfs diff=lfs merge=lfs -text -*.a filter=lfs diff=lfs merge=lfs -text *.h linguist-language=cpp *.c linguist-language=cpp diff --git a/.github/workflows/ccpp.yml b/.github/workflows/ccpp.yml new file mode 100644 index 00000000..dc2660e2 --- /dev/null +++ b/.github/workflows/ccpp.yml @@ -0,0 +1,25 @@ +name: C/C++ CI + +on: [push] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + + - name: 下载submodule源码 + run: git submodule update --init + + - name: apt-get安装依赖库(非必选) + run: sudo apt-get install -y cmake libmysqlclient-dev libssl-dev libx264-dev libfaac-dev libmp4v2-dev libsdl-dev libavcodec-dev libavutil-dev + + - name: 编译 + run: mkdir -p linux_build && cd linux_build && cmake .. && make -j4 + + - name: 运行MediaServer + run: pwd && cd release/linux/Debug && sudo ./MediaServer -d & + + diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 4a6029b7..628d3b25 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 4a6029b74b4f2339e32b8c546388de51e4ec1bcb +Subproject commit 628d3b2527f63b54a5eb38b9e9973254d4a2192b diff --git a/Android/app/libs/arm64-v8a/libcrypto.a b/Android/app/libs/arm64-v8a/libcrypto.a index cb6cb229..6f890f91 100644 Binary files a/Android/app/libs/arm64-v8a/libcrypto.a and b/Android/app/libs/arm64-v8a/libcrypto.a differ diff --git a/Android/app/libs/arm64-v8a/libssl.a b/Android/app/libs/arm64-v8a/libssl.a index a4c5d563..a9fc3f6c 100644 Binary files a/Android/app/libs/arm64-v8a/libssl.a and b/Android/app/libs/arm64-v8a/libssl.a differ diff --git a/Android/app/libs/armeabi-v7a/libcrypto.a b/Android/app/libs/armeabi-v7a/libcrypto.a index ca9dad21..29eeb767 100644 Binary files a/Android/app/libs/armeabi-v7a/libcrypto.a and b/Android/app/libs/armeabi-v7a/libcrypto.a differ diff --git a/Android/app/libs/armeabi-v7a/libssl.a b/Android/app/libs/armeabi-v7a/libssl.a index 766cbd1a..a3678fae 100644 Binary files a/Android/app/libs/armeabi-v7a/libssl.a and b/Android/app/libs/armeabi-v7a/libssl.a differ diff --git a/Android/app/libs/armeabi/libcrypto.a b/Android/app/libs/armeabi/libcrypto.a index cf6e027e..d481a036 100644 Binary files a/Android/app/libs/armeabi/libcrypto.a and b/Android/app/libs/armeabi/libcrypto.a differ diff --git a/Android/app/libs/armeabi/libssl.a b/Android/app/libs/armeabi/libssl.a index 8740b193..143a176b 100644 Binary files a/Android/app/libs/armeabi/libssl.a and b/Android/app/libs/armeabi/libssl.a differ diff --git a/Android/app/libs/x86/libcrypto.a b/Android/app/libs/x86/libcrypto.a index 28069371..4935e4f2 100644 Binary files a/Android/app/libs/x86/libcrypto.a and b/Android/app/libs/x86/libcrypto.a differ diff --git a/Android/app/libs/x86/libssl.a b/Android/app/libs/x86/libssl.a index 92de513f..5f68368d 100644 Binary files a/Android/app/libs/x86/libssl.a and b/Android/app/libs/x86/libssl.a differ diff --git a/Android/app/libs/x86_64/libcrypto.a b/Android/app/libs/x86_64/libcrypto.a index 8339bd3c..d3e4e826 100644 Binary files a/Android/app/libs/x86_64/libcrypto.a and b/Android/app/libs/x86_64/libcrypto.a differ diff --git a/Android/app/libs/x86_64/libssl.a b/Android/app/libs/x86_64/libssl.a index b6106ad9..1b96f397 100644 Binary files a/Android/app/libs/x86_64/libssl.a and b/Android/app/libs/x86_64/libssl.a differ diff --git a/README.md b/README.md index 0eef5ac4..d02ad8e3 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,11 @@ ## Why ZLMediaKit? - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. -- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV`),and support Inter-protocol conversion. +- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion. - Multiplexing asynchronous network IO based on epoll and multi thread,extreme performance. - Well performance and stable test,can be used commercially. - Support linux, macos, ios, android, Windows Platforms. - Very low latency(lower then one second), video opened immediately. -- **Now Support websocket-flv!** ## Features @@ -118,6 +117,12 @@ - Apple OSX(Darwin), both 32 and 64bits. - All hardware with x86/x86_64/arm/mips cpu. - Windows. + +## How to build + +It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumbersome, and some features are not compiled by default. + +### Before build - **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:** ``` git clone https://github.com/zlmediakit/ZLMediaKit.git @@ -125,12 +130,6 @@ cd ZLMediaKit git submodule update --init ``` - - -## How to build - -It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumbersome, and some features are not compiled by default. - ### Build on linux - My environment @@ -307,6 +306,17 @@ It is recommended to compile on Ubuntu or MacOS,compiling on windows is cumber }); ``` +## Docker Image +You can pull a pre-built docker image from Docker Hub and run with +```bash +docker run -id -p 1935:1935 -p 8080:80 gemfield/zlmediakit +``` + +Dockerfile is also supplied to build images on Ubuntu 16.04 +```bash +cd docker +docker build -t zlmediakit . +``` ## Mirrors diff --git a/README_CN.md b/README_CN.md index 918d068a..a2eba340 100644 --- a/README_CN.md +++ b/README_CN.md @@ -4,14 +4,13 @@ ## 项目特点 - 基于C++11开发,避免使用裸指针,代码稳定可靠;同时跨平台移植简单方便,代码清晰简洁。 -- 打包多种流媒体协议(RTSP/RTMP/HLS),支持协议间的互相转换,提供一站式的服务。 +- 打包多种流媒体协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV),支持协议间的互相转换,提供一站式的服务。 - 使用epoll+线程池+异步网络IO模式开发,并发性能优越。 - 已实现主流的的H264/H265+AAC流媒体方案,代码精简,脉络清晰,适合学习。 - 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 支持linux、macos、ios、android、windows平台 - 支持画面秒开(GOP缓存)、极低延时([500毫秒内,最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95)) -- **支持websocket-flv直播** - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) ## 项目定位 @@ -129,6 +128,9 @@ ## 编译要求 - 编译器支持C++11,GCC4.8/Clang3.3/VC2015或以上 - cmake3.2或以上 + +## 编译前必看!!! + - **必须使用git下载完整的代码,不要使用下载zip包的方式下载源码,否则子模块代码默认不下载!你可以像以下这样操作:** ``` git clone https://github.com/zlmediakit/ZLMediaKit.git diff --git a/conf/config.ini b/conf/config.ini index cbbcb96d..1ef7c145 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -9,7 +9,7 @@ secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc #FFmpeg可执行程序绝对路径 bin=/usr/local/bin/ffmpeg #FFmpeg拉流再推流的命令模板,通过该模板可以设置再编码的一些参数 -cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s +cmd=%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s #FFmpeg日志的路径,如果置空则不生成FFmpeg日志 #可以为相对(相对于本可执行程序目录)或绝对路径 log=./ffmpeg/ffmpeg.log @@ -36,6 +36,12 @@ addMuteAudio=1 #拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, #如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) resetWhenRePlay=1 +#是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置 +publishToRtxp=1 +#是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置 +publishToHls=1 +#是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 +publishToMP4=0 [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 @@ -44,10 +50,12 @@ fileBufSize=65536 #可以为相对(相对于本可执行程序目录)或绝对路径 filePath=./httpRoot #hls最大切片时间 -segDur=3 +segDur=2 #m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个) #如果设置为0,则不删除切片,而是保存为点播 segNum=3 +#HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 +segRetain=5 [hook] #在推流时,如果url参数匹对admin_params,那么可以不经过hook鉴权直接推流成功,播放时亦然 diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..3f427538 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,36 @@ +FROM ubuntu:16.04 +#shell,rtmp,rtsp,rtsps,http,https +EXPOSE 9000/tcp +EXPOSE 1935/tcp +EXPOSE 554/tcp +EXPOSE 322/tcp +EXPOSE 80/tcp +EXPOSE 443/tcp + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + git \ + curl \ + ca-certificates \ + libssl-dev \ + libmysqlclient-dev \ + libx264-dev \ + libfaac-dev \ + libmp4v2-dev && \ + rm -rf /var/lib/apt/lists/* + +RUN mkdir -p /opt/media + +WORKDIR /opt/media +RUN git clone --depth=1 https://github.com/xiongziliang/ZLMediaKit && \ + cd ZLMediaKit && git submodule update --init --recursive && \ + mkdir -p build release/linux/Release/ + +WORKDIR /opt/media/ZLMediaKit/build +RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \ + make -j4 + +ENV PATH /opt/media/ZLMediaKit/release/linux/Release/:$PATH + +CMD MediaServer diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index e55e6dbb..81d98bd5 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -38,7 +38,7 @@ const char kLog[] = FFmpeg_FIELD"log"; onceToken token([]() { mINI::Instance()[kBin] = trim(System::execute("which ffmpeg")); - mINI::Instance()[kCmd] = "%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"; + mINI::Instance()[kCmd] = "%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"; mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log"; }); } @@ -48,7 +48,6 @@ FFmpegSource::FFmpegSource() { } FFmpegSource::~FFmpegSource() { - NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader); DebugL; } @@ -83,6 +82,7 @@ void FFmpegSource::play(const string &src_url,const string &dst_url,int timeout_ if(src){ //推流给自己成功 cb(SockException()); + strongSelf->onGetMediaSource(src); strongSelf->startTimer(timeout_ms); return; } @@ -192,8 +192,7 @@ void FFmpegSource::startTimer(int timeout_ms) { //同步查找流 if (!src) { //流不在线,重新拉流 - strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, - [](const SockException &) {}); + strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {}); } }); } else { @@ -205,29 +204,35 @@ void FFmpegSource::startTimer(int timeout_ms) { } return true; }, _poller); - - NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader); - NoticeCenter::Instance().addListener(this, Broadcast::kBroadcastStreamNoneReader,[weakSelf](BroadcastStreamNoneReaderArgs) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - //自身已经销毁 - return; - } - - if(sender.getVhost() != strongSelf->_media_info._vhost || - sender.getApp() != strongSelf->_media_info._app || - sender.getId() != strongSelf->_media_info._streamid){ - //不是自己感兴趣的事件,忽略之 - return; - } - - //该流无人观看,我们停止吧 - if(strongSelf->_onClose){ - strongSelf->_onClose(); - } - }); } void FFmpegSource::setOnClose(const function &cb){ _onClose = cb; -} \ No newline at end of file +} + +bool FFmpegSource::close(MediaSource &sender, bool force) { + auto listener = _listener.lock(); + if(listener && !listener->close(sender,force)){ + //关闭失败 + return false; + } + //该流无人观看,我们停止吧 + if(_onClose){ + _onClose(); + } + return true; +} + +void FFmpegSource::onNoneReader(MediaSource &sender) { + auto listener = _listener.lock(); + if(listener){ + listener->onNoneReader(sender); + }else{ + MediaSourceEvent::onNoneReader(sender); + } +} + +void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { + _listener = src->getListener(); + src->setListener(shared_from_this()); +} diff --git a/server/FFmpegSource.h b/server/FFmpegSource.h index 6be92fc3..5cf2c4f7 100644 --- a/server/FFmpegSource.h +++ b/server/FFmpegSource.h @@ -39,7 +39,7 @@ using namespace std; using namespace toolkit; using namespace mediakit; -class FFmpegSource : public std::enable_shared_from_this{ +class FFmpegSource : public std::enable_shared_from_this , public MediaSourceEvent{ public: typedef shared_ptr Ptr; typedef function onPlay; @@ -55,6 +55,10 @@ public: private: void findAsync(int maxWaitMS ,const function &cb); void startTimer(int timeout_ms); + void onGetMediaSource(const MediaSource::Ptr &src); + + bool close(MediaSource &sender,bool force) override; + void onNoneReader(MediaSource &sender) override ; private: Process _process; Timer::Ptr _timer; @@ -63,6 +67,7 @@ private: string _src_url; string _dst_url; function _onClose; + std::weak_ptr _listener; }; diff --git a/server/Process.cpp b/server/Process.cpp index 3b67145a..e215dc81 100644 --- a/server/Process.cpp +++ b/server/Process.cpp @@ -34,9 +34,8 @@ #include "Util/File.h" #include "Util/logger.h" #include "Util/uv_errno.h" -#include "Util/TimeTicker.h" +#include "Thread/WorkThreadPool.h" #include "Process.h" -#include "Poller/Timer.h" using namespace toolkit; void Process::run(const string &cmd, const string &log_file_tmp) { @@ -46,12 +45,11 @@ void Process::run(const string &cmd, const string &log_file_tmp) { throw std::runtime_error(StrPrinter << "fork child process falied,err:" << get_uv_errmsg()); } if (_pid == 0) { - //子进程 - //子进程关闭core文件生成 struct rlimit rlim = {0,0}; setrlimit(RLIMIT_CORE, &rlim); + //在启动子进程时,暂时禁用SIGINT、SIGTERM信号 // ignore the SIGINT and SIGTERM signal(SIGINT, SIG_IGN); signal(SIGTERM, SIG_IGN); @@ -109,24 +107,73 @@ void Process::run(const string &cmd, const string &log_file_tmp) { InfoL << "start child proces " << _pid; } -void Process::kill(int max_delay) { + +/** + * 获取进程是否存活状态 + * @param pid 进程号 + * @param exit_code_ptr 进程返回代码 + * @param block 是否阻塞等待 + * @return 进程是否还在运行 + */ +static bool s_wait(pid_t pid,int *exit_code_ptr,bool block) { + if (pid <= 0) { + return false; + } + int status = 0; + pid_t p = waitpid(pid, &status, block ? 0 : WNOHANG); + int exit_code = (status & 0xFF00) >> 8; + if(exit_code_ptr){ + *exit_code_ptr = (status & 0xFF00) >> 8; + } + if (p < 0) { + WarnL << "waitpid failed, pid=" << pid << ", err=" << get_uv_errmsg(); + return false; + } + if (p > 0) { + InfoL << "process terminated, pid=" << pid << ", exit code=" << exit_code; + return false; + } + //WarnL << "process is running, pid=" << _pid; + return true; +} + +static void s_kill(pid_t pid,int max_delay,bool force){ + if (pid <= 0) { + //pid无效 + return; + } + + if (::kill(pid, force ? SIGKILL : SIGTERM) == -1) { + //进程可能已经退出了 + WarnL << "kill process " << pid << " failed:" << get_uv_errmsg(); + return; + } + + if(force){ + //发送SIGKILL信号后,阻塞等待退出 + s_wait(pid, NULL, true); + DebugL << "force kill " << pid << " success!"; + return; + } + + //发送SIGTERM信号后,2秒后检查子进程是否已经退出 + WorkThreadPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){ + if (!s_wait(pid, nullptr, false)) { + //进程已经退出了 + return 0; + } + //进程还在运行 + WarnL << "process still working,force kill it:" << pid; + s_kill(pid,0, true); + return 0; + }); +} + +void Process::kill(int max_delay,bool force) { if (_pid <= 0) { return; } - if (::kill(_pid, SIGTERM) == -1) { - WarnL << "kill process " << _pid << " falied,err:" << get_uv_errmsg(); - } else { - //等待子进程退出 - auto pid = _pid; - EventPollerPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){ - //最多等待2秒,2秒后强制杀掉程序 - if (waitpid(pid, NULL, WNOHANG) == 0) { - ::kill(pid, SIGKILL); - WarnL << "force kill process " << pid; - } - return 0; - }); - } + s_kill(_pid,max_delay,force); _pid = -1; } @@ -134,28 +181,10 @@ Process::~Process() { kill(2000); } -Process::Process() { -} +Process::Process() {} bool Process::wait(bool block) { - if (_pid <= 0) { - return false; - } - int status = 0; - pid_t p = waitpid(_pid, &status, block ? 0 : WNOHANG); - - _exit_code = (status & 0xFF00) >> 8; - if (p < 0) { - WarnL << "waitpid failed, pid=" << _pid << ", err=" << get_uv_errmsg(); - return false; - } - if (p > 0) { - InfoL << "process terminated, pid=" << _pid << ", exit code=" << _exit_code; - return false; - } - - //WarnL << "process is running, pid=" << _pid; - return true; + return s_wait(_pid,&_exit_code,block); } int Process::exit_code() { diff --git a/server/Process.h b/server/Process.h index b0b994d9..cce8470a 100644 --- a/server/Process.h +++ b/server/Process.h @@ -36,7 +36,7 @@ public: Process(); ~Process(); void run(const string &cmd,const string &log_file); - void kill(int max_delay); + void kill(int max_delay,bool force = false); bool wait(bool block = true); int exit_code(); private: diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 3209b439..9a23da09 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -45,6 +45,7 @@ #include "Util/MD5.h" #include "WebApi.h" #include "WebHook.h" +#include "Thread/WorkThreadPool.h" #if !defined(_WIN32) #include "FFmpegSource.h" @@ -71,6 +72,8 @@ typedef map ApiArgsType; invoker("200 OK", headerOut, val.toStyledString()); \ }); +#define API_ARGS_VALUE sender,headerIn,headerOut,allArgs,val,invoker + #define API_REGIST_INVOKER(field, name, ...) \ s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker) __VA_ARGS__); @@ -180,19 +183,35 @@ static inline void addHttpListener(){ if(api_debug){ auto newInvoker = [invoker,parser,allArgs](const string &codeOut, const HttpSession::KeyValue &headerOut, - const string &contentOut){ + const HttpBody::Ptr &body){ stringstream ss; for(auto &pr : allArgs ){ ss << pr.first << " : " << pr.second << "\r\n"; } - DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" - << "# content:\r\n" << parser.Content() << "\r\n" - << "# args:\r\n" << ss.str() - << "# response:\r\n" - << contentOut << "\r\n"; + //body默认为空 + int64_t size = 0; + if (body && body->remainSize()) { + //有body,获取body大小 + size = body->remainSize(); + } - invoker(codeOut,headerOut,contentOut); + if(size < 4 * 1024){ + string contentOut = body->readData(size)->toString(); + DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" + << "# content:\r\n" << parser.Content() << "\r\n" + << "# args:\r\n" << ss.str() + << "# response:\r\n" + << contentOut << "\r\n"; + invoker(codeOut,headerOut,contentOut); + } else{ + DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" + << "# content:\r\n" << parser.Content() << "\r\n" + << "# args:\r\n" << ss.str() + << "# response size:" + << size <<"\r\n"; + invoker(codeOut,headerOut,body); + } }; ((HttpSession::HttpResponseInvoker &)invoker) = newInvoker; } @@ -277,6 +296,25 @@ void installWebApi() { obj["delay"] = vecDelay[i++]; val["data"].append(obj); } + val["code"] = API::Success; + invoker("200 OK", headerOut, val.toStyledString()); + }); + }); + + //获取后台工作线程负载 + //测试url http://127.0.0.1/index/api/getWorkThreadsLoad + API_REGIST_INVOKER(api, getWorkThreadsLoad, { + WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector &vecDelay) { + Value val; + auto vec = WorkThreadPool::Instance().getExecutorLoad(); + int i = 0; + for (auto load : vec) { + Value obj(objectValue); + obj["load"] = load; + obj["delay"] = vecDelay[i++]; + val["data"].append(obj); + } + val["code"] = API::Success; invoker("200 OK", headerOut, val.toStyledString()); }); }); @@ -313,7 +351,7 @@ void installWebApi() { } if (changed > 0) { NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); - ini.dumpFile(); + ini.dumpFile(g_ini_file); } val["changed"] = changed; }); @@ -357,8 +395,6 @@ void installWebApi() { API_REGIST(api,getMediaList,{ CHECK_SECRET(); //获取所有MediaSource列表 - val["code"] = API::Success; - val["msg"] = "success"; MediaSource::for_each_media([&](const string &schema, const string &vhost, const string &app, @@ -382,6 +418,13 @@ void installWebApi() { }); }); + //测试url http://127.0.0.1/index/api/isMediaOnline?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs + API_REGIST(api,isMediaOnline,{ + CHECK_SECRET(); + CHECK_ARGS("schema","vhost","app","stream"); + val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false)); + }); + //主动关断流,包括关断拉流、推流 //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 API_REGIST(api,close_stream,{ @@ -394,14 +437,53 @@ void installWebApi() { allArgs["stream"]); if(src){ bool flag = src->close(allArgs["force"].as()); - val["code"] = flag ? 0 : -1; + val["result"] = flag ? 0 : -1; val["msg"] = flag ? "success" : "close failed"; }else{ - val["code"] = -2; + val["result"] = -2; val["msg"] = "can not find the stream"; } }); + //批量主动关断流,包括关断拉流、推流 + //测试url http://127.0.0.1/index/api/close_streams?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 + API_REGIST(api,close_streams,{ + CHECK_SECRET(); + //筛选命中个数 + int count_hit = 0; + int count_closed = 0; + list media_list; + MediaSource::for_each_media([&](const string &schema, + const string &vhost, + const string &app, + const string &stream, + const MediaSource::Ptr &media){ + if(!allArgs["schema"].empty() && allArgs["schema"] != schema){ + return; + } + if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){ + return; + } + if(!allArgs["app"].empty() && allArgs["app"] != app){ + return; + } + if(!allArgs["stream"].empty() && allArgs["stream"] != stream){ + return; + } + ++count_hit; + media_list.emplace_back(media); + }); + + bool force = allArgs["force"].as(); + for(auto &media : media_list){ + if(media->close(force)){ + ++count_closed; + } + } + val["count_hit"] = count_hit; + val["count_closed"] = count_closed; + }); + //获取所有TcpSession列表信息 //可以根据本地端口和远端ip来筛选 //测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935 @@ -412,7 +494,7 @@ void installWebApi() { string &peer_ip = allArgs["peer_ip"]; SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){ - if(local_port != API::Success && local_port != session->get_local_port()){ + if(local_port != 0 && local_port != session->get_local_port()){ return; } if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){ @@ -436,13 +518,36 @@ void installWebApi() { //踢掉tcp会话 auto session = SessionMap::Instance().get(allArgs["id"]); if(!session){ - val["code"] = API::OtherFailed; - val["msg"] = "can not find the target"; - return; + throw ApiRetException("can not find the target",API::OtherFailed); } session->safeShutdown(); - val["code"] = API::Success; - val["msg"] = "success"; + }); + + + //批量断开tcp连接,比如说可以断开rtsp、rtmp播放器等 + //测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935 + API_REGIST(api,kick_sessions,{ + CHECK_SECRET(); + uint16_t local_port = allArgs["local_port"].as(); + string &peer_ip = allArgs["peer_ip"]; + uint64_t count_hit = 0; + + list session_list; + SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){ + if(local_port != 0 && local_port != session->get_local_port()){ + return; + } + if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){ + return; + } + session_list.emplace_back(session); + ++count_hit; + }); + + for(auto &session : session_list){ + session->safeShutdown(); + } + val["count_hit"] = (Json::UInt64)count_hit; }); static auto addStreamProxy = [](const string &vhost, @@ -520,7 +625,7 @@ void installWebApi() { }); #if !defined(_WIN32) - static auto addFFmepgSource = [](const string &src_url, + static auto addFFmpegSource = [](const string &src_url, const string &dst_url, int timeout_ms, const function &cb){ @@ -557,7 +662,7 @@ void installWebApi() { auto dst_url = allArgs["dst_url"]; int timeout_ms = allArgs["timeout_ms"]; - addFFmepgSource(src_url,dst_url,timeout_ms,[invoker,val,headerOut](const SockException &ex,const string &key){ + addFFmpegSource(src_url,dst_url,timeout_ms,[invoker,val,headerOut](const SockException &ex,const string &key){ if(ex){ const_cast(val)["code"] = API::OtherFailed; const_cast(val)["msg"] = ex.what(); @@ -568,16 +673,33 @@ void installWebApi() { }); }); - //关闭拉流代理 - //测试url http://127.0.0.1/index/api/delFFmepgSource?key=key - API_REGIST(api,delFFmepgSource,{ + + static auto api_delFFmpegSource = [](API_ARGS,const HttpSession::HttpResponseInvoker &invoker){ CHECK_SECRET(); CHECK_ARGS("key"); lock_guard lck(s_ffmpegMapMtx); val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1; + }; + + //关闭拉流代理 + //测试url http://127.0.0.1/index/api/delFFmepgSource?key=key + API_REGIST(api,delFFmpegSource,{ + api_delFFmpegSource(API_ARGS_VALUE); + }); + + //此处为了兼容之前的拼写错误 + API_REGIST(api,delFFmepgSource,{ + api_delFFmpegSource(API_ARGS_VALUE); }); #endif + //新增http api下载可执行程序文件接口 + //测试url http://127.0.0.1/index/api/downloadBin + API_REGIST_INVOKER(api,downloadBin,{ + CHECK_SECRET(); + invoker.responseFile(headerIn,StrCaseMap(),exePath()); + }); + ////////////以下是注册的Hook API//////////// API_REGIST(hook,on_publish,{ //开始推流事件 @@ -636,7 +758,7 @@ void installWebApi() { << allArgs["stream"] << "?vhost=" << allArgs["vhost"]; - addFFmepgSource("http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8",/** ffmpeg拉流支持任意编码格式任意协议 **/ + addFFmpegSource("http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8",/** ffmpeg拉流支持任意编码格式任意协议 **/ dst_url, (1000 * timeout_sec) - 500, [invoker,val,headerOut](const SockException &ex,const string &key){ diff --git a/server/WebApi.h b/server/WebApi.h index c3ffecf1..2a917db3 100644 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -47,5 +47,7 @@ extern const string kPort; void installWebApi(); void unInstallWebApi(); +//配置文件路径 +extern string g_ini_file; #endif //ZLMEDIAKIT_WEBAPI_H diff --git a/server/WebHook.cpp b/server/WebHook.cpp index e4c8945e..1391d10a 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -195,7 +195,10 @@ void installWebHook(){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){ - invoker("",true, true,false); + GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); + GET_CONFIG(bool,toHls,General::kPublishToHls); + GET_CONFIG(bool,toMP4,General::kPublishToMP4); + invoker("",toRtxp,toHls,toMP4); return; } //异步执行该hook api,防止阻塞NoticeCenter diff --git a/server/main.cpp b/server/main.cpp index fe583513..82fbd0b9 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -148,7 +148,7 @@ public: Option::ArgRequired,/*该选项后面必须跟值*/ (exeDir() + "ssl.p12").data(),/*该选项默认值*/ false,/*该选项是否必须赋值,如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/ - "ssl证书路径,支持p12/pem类型",/*该选项说明文字*/ + "ssl证书文件或文件夹,支持p12/pem类型",/*该选项说明文字*/ nullptr); (*_parser) << Option('t',/*该选项简称,如果是\x00则说明无简称*/ @@ -205,6 +205,8 @@ static void inline listen_shell_input(){ } #endif//!defined(_WIN32) +//全局变量,在WebApi中用于保存配置文件用 +string g_ini_file; int start_main(int argc,char *argv[]) { { @@ -219,7 +221,7 @@ int start_main(int argc,char *argv[]) { bool bDaemon = cmd_main.hasKey("daemon"); LogLevel logLevel = (LogLevel) cmd_main["level"].as(); logLevel = MIN(MAX(logLevel, LTrace), LError); - static string ini_file = cmd_main["config"]; + g_ini_file = cmd_main["config"]; string ssl_file = cmd_main["ssl"]; int threads = cmd_main["threads"]; @@ -244,14 +246,21 @@ int start_main(int argc,char *argv[]) { //启动异步日志线程 Logger::Instance().setWriter(std::make_shared()); //加载配置文件,如果配置文件不存在就创建一个 - loadIniConfig(ini_file.data()); + loadIniConfig(g_ini_file.data()); - //加载证书,证书包含公钥和私钥 - SSL_Initor::Instance().loadCertificate(ssl_file.data()); - //信任某个自签名证书 - SSL_Initor::Instance().trustCertificate(ssl_file.data()); - //不忽略无效证书证书(例如自签名或过期证书) - SSL_Initor::Instance().ignoreInvalidCertificate(true); + if(!File::is_dir(ssl_file.data())){ + //不是文件夹,加载证书,证书包含公钥和私钥 + SSL_Initor::Instance().loadCertificate(ssl_file.data()); + }else{ + //加载文件夹下的所有证书 + File::scanDir(ssl_file,[](const string &path, bool isDir){ + if(!isDir){ + //最后的一个证书会当做默认证书(客户端ssl握手时未指定主机) + SSL_Initor::Instance().loadCertificate(path.data()); + } + return true; + }); + } uint16_t shellPort = mINI::Instance()[Shell::kPort]; uint16_t rtspPort = mINI::Instance()[Rtsp::kPort]; @@ -306,7 +315,7 @@ int start_main(int argc,char *argv[]) { });// 设置退出信号 #if !defined(_WIN32) - signal(SIGHUP, [](int) { mediakit::loadIniConfig(ini_file.data()); }); + signal(SIGHUP, [](int) { mediakit::loadIniConfig(g_ini_file.data()); }); #endif sem.wait(); } diff --git a/src/Common/Device.cpp b/src/Common/Device.cpp index d98683e1..a04dba72 100644 --- a/src/Common/Device.cpp +++ b/src/Common/Device.cpp @@ -32,6 +32,7 @@ #include "Util/TimeTicker.h" #include "Extension/AAC.h" #include "Extension/H264.h" +#include "Extension/H265.h" using namespace toolkit; @@ -106,6 +107,24 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32 inputFrame(std::make_shared((char *)pcData,iDataLen,dts,pts,prefixeSize)); } +void DevChannel::inputH265(const char* pcData, int iDataLen, uint32_t dts,uint32_t pts) { + if(dts == 0){ + dts = (uint32_t)_aTicker[0].elapsedTime(); + } + if(pts == 0){ + pts = dts; + } + int prefixeSize; + if (memcmp("\x00\x00\x00\x01", pcData, 4) == 0) { + prefixeSize = 4; + } else if (memcmp("\x00\x00\x01", pcData, 3) == 0) { + prefixeSize = 3; + } else { + prefixeSize = 0; + } + inputFrame(std::make_shared((char *)pcData,iDataLen,dts,pts,prefixeSize)); +} + void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) { if(withAdtsHeader){ inputAAC(pcData+7,iDataLen-7,uiStamp,pcData); @@ -135,6 +154,11 @@ void DevChannel::initVideo(const VideoInfo& info) { addTrack(std::make_shared()); } +void DevChannel::initH265Video(const VideoInfo &info){ + _video = std::make_shared(info); + addTrack(std::make_shared()); +} + void DevChannel::initAudio(const AudioInfo& info) { _audio = std::make_shared(info); addTrack(std::make_shared()); diff --git a/src/Common/Device.h b/src/Common/Device.h index 7e5e7684..a8c63daf 100644 --- a/src/Common/Device.h +++ b/src/Common/Device.h @@ -88,6 +88,12 @@ public: */ void initVideo(const VideoInfo &info); + /** + * 初始化h265视频Track + * @param info + */ + void initH265Video(const VideoInfo &info); + /** * 初始化aac音频Track * 相当于MultiMediaSourceMuxer::addTrack(AACTrack::Ptr ); @@ -104,6 +110,15 @@ public: */ void inputH264(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0); + /** + * 输入265帧 + * @param pcData 265单帧数据指针 + * @param iDataLen 数据指针长度 + * @param dts 解码时间戳,单位毫秒;等于0时内部会自动生成时间戳 + * @param pts 播放时间戳,单位毫秒;等于0时内部会赋值为dts + */ + void inputH265(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0); + /** * 输入可能带adts头的aac帧 * @param pcDataWithAdts 可能带adts头的aac帧 diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index a54bc8d8..5cfe597d 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -167,10 +167,15 @@ public: } listener->onNoneReader(*this); } + virtual void setListener(const std::weak_ptr &listener){ _listener = listener; } + std::weak_ptr getListener(){ + return _listener; + } + template static void for_each_media(FUN && fun){ lock_guard lock(g_mtxMediaSrc); diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 15540bcd..39336e69 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -79,6 +79,9 @@ const string kEnableVhost = GENERAL_FIELD"enableVhost"; const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay"; const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio"; const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay"; +const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp"; +const string kPublishToHls = GENERAL_FIELD"publishToHls"; +const string kPublishToMP4 = GENERAL_FIELD"publishToMP4"; onceToken token([](){ mINI::Instance()[kFlowThreshold] = 1024; @@ -88,6 +91,9 @@ onceToken token([](){ mINI::Instance()[kUltraLowDelay] = 1; mINI::Instance()[kAddMuteAudio] = 1; mINI::Instance()[kResetWhenRePlay] = 1; + mINI::Instance()[kPublishToRtxp] = 1; + mINI::Instance()[kPublishToHls] = 1; + mINI::Instance()[kPublishToMP4] = 0; },nullptr); }//namespace General @@ -95,57 +101,44 @@ onceToken token([](){ ////////////HTTP配置/////////// namespace Http { #define HTTP_FIELD "http." - //http 文件发送缓存大小 -#define HTTP_SEND_BUF_SIZE (64 * 1024) const string kSendBufSize = HTTP_FIELD"sendBufSize"; - //http 最大请求字节数 -#define HTTP_MAX_REQ_SIZE (4*1024) const string kMaxReqSize = HTTP_FIELD"maxReqSize"; - //http keep-alive秒数 -#define HTTP_KEEP_ALIVE_SECOND 10 const string kKeepAliveSecond = HTTP_FIELD"keepAliveSecond"; - //http keep-alive最大请求数 -#define HTTP_MAX_REQ_CNT 100 const string kMaxReqCount = HTTP_FIELD"maxReqCount"; - - //http 字符编码 -#if defined(_WIN32) -#define HTTP_CHAR_SET "gb2312" -#else -#define HTTP_CHAR_SET "utf-8" -#endif const string kCharSet = HTTP_FIELD"charSet"; - //http 服务器根目录 -#define HTTP_ROOT_PATH "./httpRoot" const string kRootPath = HTTP_FIELD"rootPath"; - //http 404错误提示内容 -#define HTTP_NOT_FOUND ""\ - "404 Not Found"\ - ""\ - "

您访问的资源不存在!

"\ - "
"\ - SERVER_NAME\ - "
"\ - ""\ - "" const string kNotFound = HTTP_FIELD"notFound"; - onceToken token([](){ - mINI::Instance()[kSendBufSize] = HTTP_SEND_BUF_SIZE; - mINI::Instance()[kMaxReqSize] = HTTP_MAX_REQ_SIZE; - mINI::Instance()[kKeepAliveSecond] = HTTP_KEEP_ALIVE_SECOND; - mINI::Instance()[kMaxReqCount] = HTTP_MAX_REQ_CNT; - mINI::Instance()[kCharSet] = HTTP_CHAR_SET; - mINI::Instance()[kRootPath] = HTTP_ROOT_PATH; - mINI::Instance()[kNotFound] = HTTP_NOT_FOUND; + mINI::Instance()[kSendBufSize] = 64 * 1024; + mINI::Instance()[kMaxReqSize] = 4*1024; + mINI::Instance()[kKeepAliveSecond] = 15; + mINI::Instance()[kMaxReqCount] = 100; + +#if defined(_WIN32) + mINI::Instance()[kCharSet] = "gb2312"; +#else + mINI::Instance()[kCharSet] ="utf-8"; +#endif + + mINI::Instance()[kRootPath] = "./httpRoot"; + mINI::Instance()[kNotFound] = + "" + "404 Not Found" + "" + "

您访问的资源不存在!

" + "
" + SERVER_NAME + "
" + "" + ""; },nullptr); }//namespace Http @@ -153,12 +146,10 @@ onceToken token([](){ ////////////SHELL配置/////////// namespace Shell { #define SHELL_FIELD "shell." - -#define SHELL_MAX_REQ_SIZE 1024 const string kMaxReqSize = SHELL_FIELD"maxReqSize"; onceToken token([](){ - mINI::Instance()[kMaxReqSize] = SHELL_MAX_REQ_SIZE; + mINI::Instance()[kMaxReqSize] = 1024; },nullptr); } //namespace Shell @@ -179,7 +170,6 @@ onceToken token([](){ mINI::Instance()[kDirectProxy] = 1; mINI::Instance()[kModifyStamp] = false; },nullptr); - } //namespace Rtsp ////////////RTMP服务器配置/////////// @@ -194,40 +184,28 @@ onceToken token([](){ mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15; },nullptr); - } //namespace RTMP ////////////RTP配置/////////// namespace Rtp { #define RTP_FIELD "rtp." - //RTP打包最大MTU,公网情况下更小 -#define RTP_VIDOE_MTU_SIZE 1400 const string kVideoMtuSize = RTP_FIELD"videoMtuSize"; - -#define RTP_Audio_MTU_SIZE 600 const string kAudioMtuSize = RTP_FIELD"audioMtuSize"; - //RTP排序缓存最大个数 -#define RTP_MAX_RTP_COUNT 50 const string kMaxRtpCount = RTP_FIELD"maxRtpCount"; - //如果RTP序列正确次数累计达到该数字就启动清空排序缓存 -#define RTP_CLEAR_COUNT 10 const string kClearCount = RTP_FIELD"clearCount"; - //最大RTP时间为13个小时,每13小时回环一次 -#define RTP_CYCLE_MS (13*60*60*1000) const string kCycleMS = RTP_FIELD"cycleMS"; - onceToken token([](){ - mINI::Instance()[kVideoMtuSize] = RTP_VIDOE_MTU_SIZE; - mINI::Instance()[kAudioMtuSize] = RTP_Audio_MTU_SIZE; - mINI::Instance()[kMaxRtpCount] = RTP_MAX_RTP_COUNT; - mINI::Instance()[kClearCount] = RTP_CLEAR_COUNT; - mINI::Instance()[kCycleMS] = RTP_CYCLE_MS; + mINI::Instance()[kVideoMtuSize] = 1400; + mINI::Instance()[kAudioMtuSize] = 600; + mINI::Instance()[kMaxRtpCount] = 50; + mINI::Instance()[kClearCount] = 10; + mINI::Instance()[kCycleMS] = 13*60*60*1000; },nullptr); } //namespace Rtsp @@ -239,88 +217,67 @@ const string kAddrMin = MULTI_FIELD"addrMin"; //组播分配截止地址 const string kAddrMax = MULTI_FIELD"addrMax"; //组播TTL -#define MULTI_UDP_TTL 64 const string kUdpTTL = MULTI_FIELD"udpTTL"; onceToken token([](){ mINI::Instance()[kAddrMin] = "239.0.0.0"; mINI::Instance()[kAddrMax] = "239.255.255.255"; - mINI::Instance()[kUdpTTL] = MULTI_UDP_TTL; + mINI::Instance()[kUdpTTL] = 64; },nullptr); - } //namespace MultiCast ////////////录像配置/////////// namespace Record { #define RECORD_FIELD "record." - //查看录像的应用名称 -#define RECORD_APP_NAME "record" const string kAppName = RECORD_FIELD"appName"; - //每次流化MP4文件的时长,单位毫秒 -#define RECORD_SAMPLE_MS 500 const string kSampleMS = RECORD_FIELD"sampleMS"; - //MP4文件录制大小,默认一个小时 -#define RECORD_FILE_SECOND (60*60) const string kFileSecond = RECORD_FIELD"fileSecond"; - //录制文件路径 -#define RECORD_FILE_PATH HTTP_ROOT_PATH const string kFilePath = RECORD_FIELD"filePath"; - //mp4文件写缓存大小 const string kFileBufSize = RECORD_FIELD"fileBufSize"; - //mp4录制完成后是否进行二次关键帧索引写入头部 const string kFastStart = RECORD_FIELD"fastStart"; - //mp4文件是否重头循环读取 const string kFileRepeat = RECORD_FIELD"fileRepeat"; onceToken token([](){ - mINI::Instance()[kAppName] = RECORD_APP_NAME; - mINI::Instance()[kSampleMS] = RECORD_SAMPLE_MS; - mINI::Instance()[kFileSecond] = RECORD_FILE_SECOND; - mINI::Instance()[kFilePath] = RECORD_FILE_PATH; + mINI::Instance()[kAppName] = "record"; + mINI::Instance()[kSampleMS] = 500; + mINI::Instance()[kFileSecond] = 60*60; + mINI::Instance()[kFilePath] = "./httpRoot"; mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kFastStart] = false; mINI::Instance()[kFileRepeat] = false; },nullptr); - } //namespace Record ////////////HLS相关配置/////////// namespace Hls { #define HLS_FIELD "hls." - //HLS切片时长,单位秒 -#define HLS_SEGMENT_DURATION 3 const string kSegmentDuration = HLS_FIELD"segDur"; - //HLS切片个数 -#define HLS_SEGMENT_NUM 3 const string kSegmentNum = HLS_FIELD"segNum"; - +//HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 +const string kSegmentRetain = HLS_FIELD"segRetain"; //HLS文件写缓存大小 -#define HLS_FILE_BUF_SIZE (64 * 1024) const string kFileBufSize = HLS_FIELD"fileBufSize"; - //录制文件路径 -#define HLS_FILE_PATH (HTTP_ROOT_PATH) const string kFilePath = HLS_FIELD"filePath"; onceToken token([](){ - mINI::Instance()[kSegmentDuration] = HLS_SEGMENT_DURATION; - mINI::Instance()[kSegmentNum] = HLS_SEGMENT_NUM; - mINI::Instance()[kFileBufSize] = HLS_FILE_BUF_SIZE; - mINI::Instance()[kFilePath] = HLS_FILE_PATH; + mINI::Instance()[kSegmentDuration] = 2; + mINI::Instance()[kSegmentNum] = 3; + mINI::Instance()[kSegmentRetain] = 5; + mINI::Instance()[kFileBufSize] = 64 * 1024; + mINI::Instance()[kFilePath] = "./httpRoot"; },nullptr); - } //namespace Hls - namespace Client { const string kNetAdapter = "net_adapter"; const string kRtpType = "rtp_type"; @@ -331,7 +288,6 @@ const string kTimeoutMS = "protocol_timeout_ms"; const string kMediaTimeoutMS = "media_timeout_ms"; const string kBeatIntervalMS = "beat_interval_ms"; const string kMaxAnalysisMS = "max_analysis_ms"; - } } // namespace mediakit diff --git a/src/Common/config.h b/src/Common/config.h index a4557ea1..ba171c99 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -182,6 +182,12 @@ extern const string kAddMuteAudio; //拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, //如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) extern const string kResetWhenRePlay; +//是否默认推流时转换成rtsp或rtmp,hook接口(on_publish)中可以覆盖该设置 +extern const string kPublishToRtxp ; +//是否默认推流时转换成hls,hook接口(on_publish)中可以覆盖该设置 +extern const string kPublishToHls ; +//是否默认推流时mp4录像,hook接口(on_publish)中可以覆盖该设置 +extern const string kPublishToMP4 ; }//namespace General @@ -283,8 +289,10 @@ extern const string kFileRepeat; namespace Hls { //HLS切片时长,单位秒 extern const string kSegmentDuration; -//HLS切片个数,如果设置为0,则不删除切片,而是保存为点播 +//m3u8文件中HLS切片个数,如果设置为0,则不删除切片,而是保存为点播 extern const string kSegmentNum; +//HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 +extern const string kSegmentRetain; //HLS文件写缓存大小 extern const string kFileBufSize; //录制文件路径 diff --git a/src/Http/HttpBody.cpp b/src/Http/HttpBody.cpp new file mode 100644 index 00000000..9e6a0592 --- /dev/null +++ b/src/Http/HttpBody.cpp @@ -0,0 +1,257 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "HttpBody.h" +#include "Util/util.h" +#include "Util/uv_errno.h" +#include "Util/logger.h" +#include "HttpClient.h" +#ifndef _WIN32 +#include +#endif + +#ifndef _WIN32 +#define ENABLE_MMAP +#endif + +namespace mediakit { + +HttpStringBody::HttpStringBody(const string &str){ + _str = str; +} +uint64_t HttpStringBody::remainSize() { + return _str.size() - _offset; +} + +Buffer::Ptr HttpStringBody::readData(uint32_t size) { + size = MIN(remainSize(),size); + if(!size){ + //没有剩余字节了 + return nullptr; + } + auto ret = std::make_shared(_str,_offset,size); + _offset += size; + return ret; +} + +////////////////////////////////////////////////////////////////// +HttpFileBody::HttpFileBody(const string &filePath){ + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if(fp){ + fclose(fp); + } + }); + if(!fp){ + init(fp,0,0); + }else{ + init(fp,0,HttpMultiFormBody::fileSize(fp.get())); + } +} + +HttpFileBody::HttpFileBody(const std::shared_ptr &fp, uint64_t offset, uint64_t max_size) { + init(fp,offset,max_size); +} + +void HttpFileBody::init(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size){ + _fp = fp; + _max_size = max_size; +#ifdef ENABLE_MMAP + do { + if(!_fp){ + //文件不存在 + break; + } + int fd = fileno(fp.get()); + if (fd < 0) { + WarnL << "fileno failed:" << get_uv_errmsg(false); + break; + } + auto ptr = (char *) mmap(NULL, max_size, PROT_READ, MAP_SHARED, fd, offset); + if (ptr == MAP_FAILED) { + WarnL << "mmap failed:" << get_uv_errmsg(false); + break; + } + _map_addr.reset(ptr,[max_size,fp](char *ptr){ + munmap(ptr,max_size); + }); + } while (false); +#endif + if(!_map_addr && offset && fp.get()){ + //未映射,那么fseek设置偏移量 + fseek(fp.get(), offset, SEEK_SET); + } +} + + +class BufferMmap : public Buffer{ +public: + typedef std::shared_ptr Ptr; + BufferMmap(const std::shared_ptr &map_addr,uint64_t offset,int size){ + _map_addr = map_addr; + _data = map_addr.get() + offset; + _size = size; + }; + virtual ~BufferMmap(){}; + //返回数据长度 + char *data() const override { + return _data; + } + uint32_t size() const override{ + return _size; + } +private: + std::shared_ptr _map_addr; + char *_data; + uint32_t _size; +}; + +uint64_t HttpFileBody::remainSize() { + return _max_size - _offset; +} + +Buffer::Ptr HttpFileBody::readData(uint32_t size) { + size = MIN(remainSize(),size); + if(!size){ + //没有剩余字节了 + return nullptr; + } + if(!_map_addr){ + //fread模式 + int iRead; + auto ret = _pool.obtain(); + ret->setCapacity(size + 1); + do{ + iRead = fread(ret->data(), 1, size, _fp.get()); + }while(-1 == iRead && UV_EINTR == get_uv_error(false)); + + if(iRead > 0){ + //读到数据了 + ret->setSize(iRead); + _offset += iRead; + return std::move(ret); + } + //读取文件异常,文件真实长度小于声明长度 + _offset = _max_size; + WarnL << "read file err:" << get_uv_errmsg(); + return nullptr; + } + + //mmap模式 + auto ret = std::make_shared(_map_addr,_offset,size); + _offset += size; + return std::move(ret); +} + +////////////////////////////////////////////////////////////////// +HttpMultiFormBody::HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary){ + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if(fp){ + fclose(fp); + } + }); + if(!fp){ + throw std::invalid_argument(StrPrinter << "open file failed:" << filePath << " " << get_uv_errmsg()); + } + _fileBody = std::make_shared(fp, 0, fileSize(fp.get())); + + auto fileName = filePath; + auto pos = filePath.rfind('/'); + if(pos != string::npos){ + fileName = filePath.substr(pos + 1); + } + _bodyPrefix = multiFormBodyPrefix(args,boundary,fileName); + _bodySuffix = multiFormBodySuffix(boundary); + _totalSize = _bodyPrefix.size() + _bodySuffix.size() + _fileBody->remainSize(); +} + +uint64_t HttpMultiFormBody::remainSize() { + return _totalSize - _offset; +} + +Buffer::Ptr HttpMultiFormBody::readData(uint32_t size){ + if(_bodyPrefix.size()){ + auto ret = std::make_shared(_bodyPrefix); + _offset += _bodyPrefix.size(); + _bodyPrefix.clear(); + return ret; + } + + if(_fileBody->remainSize()){ + auto ret = _fileBody->readData(size); + if(!ret){ + //读取文件出现异常,提前中断 + _offset = _totalSize; + }else{ + _offset += ret->size(); + } + return ret; + } + + if(_bodySuffix.size()){ + auto ret = std::make_shared(_bodySuffix); + _offset = _totalSize; + _bodySuffix.clear(); + return ret; + } + + return nullptr; +} + +string HttpMultiFormBody::multiFormBodySuffix(const string &boundary){ + string MPboundary = string("--") + boundary; + string endMPboundary = MPboundary + "--"; + _StrPrinter body; + body << "\r\n" << endMPboundary; + return body; +} + +uint64_t HttpMultiFormBody::fileSize(FILE *fp) { + auto current = ftell(fp); + fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */ + auto end = ftell(fp); /* 得到文件大小 */ + fseek(fp,current,SEEK_SET); + return end - current; +} + +string HttpMultiFormBody::multiFormContentType(const string &boundary){ + return StrPrinter << "multipart/form-data; boundary=" << boundary; +} + +string HttpMultiFormBody::multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName){ + string MPboundary = string("--") + boundary; + _StrPrinter body; + for(auto &pr : args){ + body << MPboundary << "\r\n"; + body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n"; + body << pr.second << "\r\n"; + } + body << MPboundary << "\r\n"; + body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n"; + body << "Content-Type: application/octet-stream\r\n\r\n" ; + return body; +} + +}//namespace mediakit diff --git a/src/Http/HttpBody.h b/src/Http/HttpBody.h new file mode 100644 index 00000000..d119cbc5 --- /dev/null +++ b/src/Http/HttpBody.h @@ -0,0 +1,145 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef ZLMEDIAKIT_FILEREADER_H +#define ZLMEDIAKIT_FILEREADER_H + +#include +#include +#include "Network/Buffer.h" +#include "Util/ResourcePool.h" +#include "Util/logger.h" + +using namespace std; +using namespace toolkit; + +#ifndef MIN +#define MIN(a,b) ((a) < (b) ? (a) : (b) ) +#endif //MIN + +namespace mediakit { + +/** + * http content部分基类定义 + */ +class HttpBody{ +public: + typedef std::shared_ptr Ptr; + HttpBody(){} + virtual ~HttpBody(){} + + /** + * 剩余数据大小 + */ + virtual uint64_t remainSize() { return 0;}; + + /** + * 读取一定字节数,返回大小可能小于size + * @param size 请求大小 + * @return 字节对象 + */ + virtual Buffer::Ptr readData(uint32_t size) { return nullptr;}; +}; + +/** + * string类型的content + */ +class HttpStringBody : public HttpBody{ +public: + typedef std::shared_ptr Ptr; + HttpStringBody(const string &str); + virtual ~HttpStringBody(){} + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override ; +private: + mutable string _str; + uint64_t _offset = 0; +}; + +/** + * 文件类型的content + */ +class HttpFileBody : public HttpBody{ +public: + typedef std::shared_ptr Ptr; + + /** + * 构造函数 + * @param fp 文件句柄,文件的偏移量必须为0 + * @param offset 相对文件头的偏移量 + * @param max_size 最大读取字节数,未判断是否大于文件真实大小 + */ + HttpFileBody(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size); + HttpFileBody(const string &file_path); + ~HttpFileBody(){}; + + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override; +private: + void init(const std::shared_ptr &fp,uint64_t offset,uint64_t max_size); +private: + std::shared_ptr _fp; + uint64_t _max_size; + uint64_t _offset = 0; + std::shared_ptr _map_addr; + ResourcePool _pool; +}; + +class HttpArgs; + +/** + * http MultiForm 方式提交的http content + */ +class HttpMultiFormBody : public HttpBody { +public: + typedef std::shared_ptr Ptr; + + /** + * 构造函数 + * @param args http提交参数列表 + * @param filePath 文件路径 + * @param boundary boundary字符串 + */ + HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary = "0xKhTmLbOuNdArY"); + virtual ~HttpMultiFormBody(){} + uint64_t remainSize() override ; + Buffer::Ptr readData(uint32_t size) override; +public: + static string multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName); + static string multiFormBodySuffix(const string &boundary); + static uint64_t fileSize(FILE *fp); + static string multiFormContentType(const string &boundary); +private: + string _bodyPrefix; + string _bodySuffix; + uint64_t _offset = 0; + uint64_t _totalSize; + HttpFileBody::Ptr _fileBody; +}; + +}//namespace mediakit + +#endif //ZLMEDIAKIT_FILEREADER_H diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index d9eafee0..c7483b33 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -242,8 +242,9 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) { void HttpClient::onFlush() { _aliveTicker.resetTime(); + GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); while (_body && _body->remainSize() && !isSocketBusy()) { - auto buffer = _body->readData(); + auto buffer = _body->readData(sendBufSize); if (!buffer) { //数据发送结束或读取数据异常 break; diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 1f8287e5..59ab4629 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -39,7 +39,7 @@ #include "HttpCookie.h" #include "HttpChunkedSplitter.h" #include "strCoding.h" - +#include "HttpBody.h" using namespace std; using namespace toolkit; @@ -64,145 +64,6 @@ public: } }; -class HttpBody{ -public: - typedef std::shared_ptr Ptr; - HttpBody(){} - virtual ~HttpBody(){} - //剩余数据大小 - virtual uint64_t remainSize() = 0; - virtual Buffer::Ptr readData() = 0; -}; - -class HttpStringBody : public HttpBody{ -public: - typedef std::shared_ptr Ptr; - HttpStringBody(const string &str){ - _str = str; - } - virtual ~HttpStringBody(){} - - uint64_t remainSize() override { - return _str.size(); - } - Buffer::Ptr readData() override { - auto ret = std::make_shared(_str); - _str.clear(); - return ret; - } -private: - mutable string _str; -}; - - -class HttpMultiFormBody : public HttpBody { -public: - typedef std::shared_ptr Ptr; - template - HttpMultiFormBody(const MapType &args,const string &filePath,const string &boundary,uint32_t sliceSize = 4 * 1024){ - _fp = fopen(filePath.data(),"rb"); - if(!_fp){ - throw std::invalid_argument(StrPrinter << "打开文件失败:" << filePath << " " << get_uv_errmsg()); - } - auto fileName = filePath; - auto pos = filePath.rfind('/'); - if(pos != string::npos){ - fileName = filePath.substr(pos + 1); - } - _bodyPrefix = multiFormBodyPrefix(args,boundary,fileName); - _bodySuffix = multiFormBodySuffix(boundary); - _totalSize = _bodyPrefix.size() + _bodySuffix.size() + fileSize(_fp); - _sliceSize = sliceSize; - } - virtual ~HttpMultiFormBody(){ - fclose(_fp); - } - - uint64_t remainSize() override { - return _totalSize - _offset; - } - - Buffer::Ptr readData() override{ - if(_bodyPrefix.size()){ - auto ret = std::make_shared(_bodyPrefix); - _offset += _bodyPrefix.size(); - _bodyPrefix.clear(); - return ret; - } - - if(0 == feof(_fp)){ - auto ret = std::make_shared(_sliceSize); - //读文件 - int size; - do{ - size = fread(ret->data(),1,_sliceSize,_fp); - }while(-1 == size && UV_EINTR == get_uv_error(false)); - - if(size == -1){ - _offset = _totalSize; - WarnL << "fread failed:" << get_uv_errmsg(); - return nullptr; - } - _offset += size; - ret->setSize(size); - return ret; - } - - if(_bodySuffix.size()){ - auto ret = std::make_shared(_bodySuffix); - _offset = _totalSize; - _bodySuffix.clear(); - return ret; - } - - return nullptr; - } - -public: - template - static string multiFormBodyPrefix(const MapType &args,const string &boundary,const string &fileName){ - string MPboundary = string("--") + boundary; - _StrPrinter body; - for(auto &pr : args){ - body << MPboundary << "\r\n"; - body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n"; - body << pr.second << "\r\n"; - } - body << MPboundary << "\r\n"; - body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n"; - body << "Content-Type: application/octet-stream\r\n\r\n" ; - return body; - } - static string multiFormBodySuffix(const string &boundary){ - string MPboundary = string("--") + boundary; - string endMPboundary = MPboundary + "--"; - _StrPrinter body; - body << "\r\n" << endMPboundary; - return body; - } - - static uint64_t fileSize(FILE *fp) { - auto current = ftell(fp); - fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */ - auto end = ftell(fp); /* 得到文件大小 */ - fseek(fp,current,SEEK_SET); - return end - current; - } - - static string multiFormContentType(const string &boundary){ - return StrPrinter << "multipart/form-data; boundary=" << boundary; - } -private: - FILE *_fp; - string _bodyPrefix; - string _bodySuffix; - uint64_t _offset = 0; - uint64_t _totalSize; - uint32_t _sliceSize; -}; - - - class HttpClient : public TcpClient , public HttpRequestSplitter { public: diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 32765c84..10d6b08d 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -59,49 +59,138 @@ string dateStr() { strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt)); return buf; } -static const char* -get_mime_type(const char* name) { - const char* dot; - dot = strrchr(name, '.'); - static HttpSession::KeyValue mapType; - static onceToken token([&]() { - mapType.emplace(".html","text/html"); - mapType.emplace(".htm","text/html"); - mapType.emplace(".mp4","video/mp4"); - mapType.emplace(".m3u8","application/vnd.apple.mpegurl"); - mapType.emplace(".jpg","image/jpeg"); - mapType.emplace(".jpeg","image/jpeg"); - mapType.emplace(".gif","image/gif"); - mapType.emplace(".png","image/png"); - mapType.emplace(".ico","image/x-icon"); - mapType.emplace(".css","text/css"); - mapType.emplace(".js","application/javascript"); - mapType.emplace(".au","audio/basic"); - mapType.emplace(".wav","audio/wav"); - mapType.emplace(".avi","video/x-msvideo"); - mapType.emplace(".mov","video/quicktime"); - mapType.emplace(".qt","video/quicktime"); - mapType.emplace(".mpeg","video/mpeg"); - mapType.emplace(".mpe","video/mpeg"); - mapType.emplace(".vrml","model/vrml"); - mapType.emplace(".wrl","model/vrml"); - mapType.emplace(".midi","audio/midi"); - mapType.emplace(".mid","audio/midi"); - mapType.emplace(".mp3","audio/mpeg"); - mapType.emplace(".ogg","application/ogg"); - mapType.emplace(".pac","application/x-ns-proxy-autoconfig"); - mapType.emplace(".flv","video/x-flv"); - }, nullptr); - if(!dot){ - return "text/plain"; - } - auto it = mapType.find(dot); - if (it == mapType.end()) { - return "text/plain"; - } - return it->second.data(); + +const char *HttpSession::get_mime_type(const char *name) { + const char *dot; + dot = strrchr(name, '.'); + static HttpSession::KeyValue mapType; + static onceToken token([&]() { + mapType.emplace(".html", "text/html"); + mapType.emplace(".htm", "text/html"); + mapType.emplace(".mp4", "video/mp4"); + mapType.emplace(".mkv", "video/x-matroska"); + mapType.emplace(".rmvb", "application/vnd.rn-realmedia"); + mapType.emplace(".rm", "application/vnd.rn-realmedia"); + mapType.emplace(".m3u8", "application/vnd.apple.mpegurl"); + mapType.emplace(".jpg", "image/jpeg"); + mapType.emplace(".jpeg", "image/jpeg"); + mapType.emplace(".gif", "image/gif"); + mapType.emplace(".png", "image/png"); + mapType.emplace(".ico", "image/x-icon"); + mapType.emplace(".css", "text/css"); + mapType.emplace(".js", "application/javascript"); + mapType.emplace(".au", "audio/basic"); + mapType.emplace(".wav", "audio/wav"); + mapType.emplace(".avi", "video/x-msvideo"); + mapType.emplace(".mov", "video/quicktime"); + mapType.emplace(".qt", "video/quicktime"); + mapType.emplace(".mpeg", "video/mpeg"); + mapType.emplace(".mpe", "video/mpeg"); + mapType.emplace(".vrml", "model/vrml"); + mapType.emplace(".wrl", "model/vrml"); + mapType.emplace(".midi", "audio/midi"); + mapType.emplace(".mid", "audio/midi"); + mapType.emplace(".mp3", "audio/mpeg"); + mapType.emplace(".ogg", "application/ogg"); + mapType.emplace(".pac", "application/x-ns-proxy-autoconfig"); + mapType.emplace(".flv", "video/x-flv"); + }, nullptr); + if (!dot) { + return "text/plain"; + } + auto it = mapType.find(dot); + if (it == mapType.end()) { + return "text/plain"; + } + return it->second.data(); } +//////////////////////////////////////////////////////////////////////////////////////////////////// + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{ + if(_lambad){ + _lambad(codeOut,headerOut,body); + } +} + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{ + this->operator()(codeOut,headerOut,std::make_shared(body)); +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){ + _lambad = lambda; +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){ + if(!lambda){ + _lambad = nullptr; + return; + } + _lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){ + string str; + if(body && body->remainSize()){ + str = body->readData(body->remainSize())->toString(); + } + lambda(codeOut,headerOut,str); + }; +} + +void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader, + const StrCaseMap &responseHeader, + const string &filePath) const { + StrCaseMap &httpHeader = const_cast(responseHeader); + do { + std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { + if (fp) { + fclose(fp); + } + }); + if (!fp) { + //打开文件失败 + break; + } + + auto &strRange = const_cast(requestHeader)["Range"]; + int64_t iRangeStart = 0; + int64_t iRangeEnd = 0 ; + int64_t fileSize = HttpMultiFormBody::fileSize(fp.get()); + + const char *pcHttpResult = NULL; + if (strRange.size() == 0) { + //全部下载 + pcHttpResult = "200 OK"; + iRangeEnd = fileSize - 1; + } else { + //分节下载 + pcHttpResult = "206 Partial Content"; + iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data()); + iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data()); + if (iRangeEnd == 0) { + iRangeEnd = fileSize - 1; + } + //分节下载返回Content-Range头 + httpHeader.emplace("Content-Range", StrPrinter << "bytes " << iRangeStart << "-" << iRangeEnd << "/" << fileSize << endl); + } + + //回复文件 + HttpBody::Ptr fileBody = std::make_shared(fp, iRangeStart, iRangeEnd - iRangeStart + 1); + (*this)(pcHttpResult, httpHeader, fileBody); + return; + }while(false); + + GET_CONFIG(string,notFound,Http::kNotFound); + GET_CONFIG(string,charSet,Http::kCharSet); + + auto strContentType = StrPrinter << "text/html; charset=" << charSet << endl; + httpHeader["Content-Type"] = strContentType; + (*this)("404 Not Found", httpHeader, notFound); +} + +HttpResponseInvokerImp::operator bool(){ + return _lambad.operator bool(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { TraceP(this); @@ -128,7 +217,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { string cmd = _parser.Method(); auto it = g_mapCmdIndex.find(cmd); if (it == g_mapCmdIndex.end()) { - sendResponse("403 Forbidden", makeHttpHeader(true), ""); + sendResponse("403 Forbidden", true); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd)); return 0; } @@ -169,21 +258,32 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { + if(_is_flv_stream){ + //flv播放器 + WarnP(this) << "播放器(" + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); + + GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); + if(_ui64TotalBytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, + _mediaInfo, + _ui64TotalBytes, + _ticker.createdTime()/1000, + true, + *this); + } + return; + } + + //http客户端 if(_ticker.createdTime() < 10 * 1000){ TraceP(this) << err.what(); }else{ WarnP(this) << err.what(); } - - GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, - _mediaInfo, - _ui64TotalBytes, - _ticker.createdTime()/1000, - true, - *this); - } } void HttpSession::onManager() { @@ -202,7 +302,7 @@ bool HttpSession::checkWebSocket(){ } auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); - KeyValue headerOut = makeHttpHeader(); + KeyValue headerOut; headerOut["Upgrade"] = "websocket"; headerOut["Connection"] = "Upgrade"; headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept; @@ -212,7 +312,7 @@ bool HttpSession::checkWebSocket(){ auto res_cb = [this,headerOut](){ _flv_over_websocket = true; - sendResponse("101 Switching Protocols",headerOut,""); + sendResponse("101 Switching Protocols",false,nullptr,headerOut,nullptr,false); }; //判断是否为websocket-flv @@ -223,11 +323,11 @@ bool HttpSession::checkWebSocket(){ //如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接 if(!onWebSocketConnect(_parser)){ - sendResponse("501 Not Implemented",headerOut,""); + sendResponse("501 Not Implemented",true, nullptr, headerOut); shutdown(SockException(Err_shutdown,"WebSocket server not implemented")); return true; } - sendResponse("101 Switching Protocols",headerOut,""); + sendResponse("101 Switching Protocols",false, nullptr,headerOut); return true; } //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 @@ -274,14 +374,14 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ auto onRes = [this,rtmp_src,cb](const string &err){ bool authSuccess = err.empty(); if(!authSuccess){ - sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); + sendResponse("401 Unauthorized", true, nullptr, KeyValue(), std::make_shared(err)); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); return ; } if(!cb) { //找到rtmp源,发送http头,负载后续发送 - sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); + sendResponse("200 OK", false, "video/x-flv",KeyValue(),nullptr,false); }else{ cb(); } @@ -291,6 +391,7 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ try{ start(getPoller(),rtmp_src); + _is_flv_stream = true; }catch (std::exception &ex){ //该rtmp源不存在 shutdown(SockException(Err_shutdown,"rtmp mediasource released")); @@ -485,16 +586,15 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { return; } - //先看看该http事件是否被拦截 if(emitHttpEvent(false)){ + //拦截http api事件 return; } - //再看看是否为http-flv直播请求 - if(checkLiveFlvStream()){ - //若是,return! - return; - } + if(checkLiveFlvStream()){ + //拦截http-flv播放器 + return; + } //事件未被拦截,则认为是http下载请求 auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl(); @@ -530,150 +630,38 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { if(!errMsg.empty()){ const_cast(strMeun) = errMsg; } - auto headerOut = makeHttpHeader(bClose,strMeun.size()); + KeyValue headerOut; if(cookie){ headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun); + sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" ,bClose, "text/html", headerOut, std::make_shared(strMeun)); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder"); }); return; } }while(0); - //访问的是文件 - struct stat tFileStat; - if (0 != stat(strFile.data(), &tFileStat)) { - //文件不存在 - sendNotFound(bClose); - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on file"); - } - //文件智能指针,防止退出时未关闭 - std::shared_ptr pFilePtr(fopen(strFile.data(), "rb"), [](FILE *pFile) { - if(pFile){ - fclose(pFile); - } - }); - - if (!pFilePtr) { - //打开文件失败 - sendNotFound(bClose); - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on open file failed"); - } - auto parser = _parser; //判断是否有权限访问该文件 - canAccessPath(_parser.Url(),false,[this,parser,tFileStat,pFilePtr,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ + canAccessPath(_parser.Url(),false,[this,parser,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ if(!errMsg.empty()){ - auto headerOut = makeHttpHeader(bClose,errMsg.size()); + KeyValue headerOut; if(cookie){ headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - sendResponse("401 Unauthorized" , headerOut, errMsg); + sendResponse("401 Unauthorized" ,bClose, nullptr, headerOut, std::make_shared(errMsg)); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed"); } - //判断是不是分节下载 - auto &strRange = parser["Range"]; - int64_t iRangeStart = 0, iRangeEnd = 0; - iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data()); - iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data()); - if (iRangeEnd == 0) { - iRangeEnd = tFileStat.st_size - 1; - } - auto httpHeader = makeHttpHeader(bClose, iRangeEnd - iRangeStart + 1, get_mime_type(strFile.data())); - const char *pcHttpResult = NULL; - if (strRange.size() == 0) { - //全部下载 - pcHttpResult = "200 OK"; - } else { - //分节下载 - pcHttpResult = "206 Partial Content"; - fseek(pFilePtr.get(), iRangeStart, SEEK_SET); - //分节下载返回Content-Range头 - httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl); - } - + KeyValue httpHeader; if(cookie){ httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - //先回复HTTP头部分 - sendResponse(pcHttpResult,httpHeader,""); - - if (iRangeEnd - iRangeStart < 0) { - //文件是空的! - throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file"); - } - //回复Content部分 - std::shared_ptr piLeft(new int64_t(iRangeEnd - iRangeStart + 1)); - GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() { - TimeTicker(); - auto strongSelf = weakSelf.lock(); - while(*piLeft && strongSelf){ - //更新超时计时器 - strongSelf->_ticker.resetTime(); - //从循环池获取一个内存片 - auto sendBuf = strongSelf->obtainBuffer(); - sendBuf->setCapacity(sendBufSize); - //本次需要读取文件字节数 - int64_t iReq = MIN(sendBufSize,*piLeft); - //读文件 - int iRead; - do{ - iRead = fread(sendBuf->data(), 1, iReq, pFilePtr.get()); - }while(-1 == iRead && UV_EINTR == get_uv_error(false)); - //文件剩余字节数 - *piLeft -= iRead; - - if (iRead < iReq || !*piLeft) { - //文件读完 - if(iRead > 0) { - sendBuf->setSize(iRead); - strongSelf->send(sendBuf); - } - - if(strongSelf->isSocketBusy()){ - //套接字忙,我们等待触发下一次onFlush事件 - //待所有数据flush到socket fd再移除onFlush事件监听 - //标记文件读写完毕 - *piLeft = 0; - return true; - } - - //文件全部flush到socket fd,可以直接关闭socket了 - break; - } - - //文件还未读完 - sendBuf->setSize(iRead); - if(strongSelf->send(sendBuf) == -1) { - //socket已经销毁,不再监听onFlush事件 - return false; - } - if(strongSelf->isSocketBusy()){ - //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket - return true; - } - //socket还可写,继续写socket - } - - if(bClose && strongSelf) { - //最后一次flush事件,文件也发送完毕了,可以关闭socket了 - strongSelf->shutdown(SockException(Err_shutdown,"read file eof")); - } - //不再监听onFlush事件 - return false; + HttpResponseInvoker invoker = [this,bClose,&strFile](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){ + sendResponse(codeOut.data(), bClose, get_mime_type(strFile.data()), headerOut, body); }; - - //文件下载提升发送性能 - setSocketFlags(); - - onFlush(); - _sock->setOnFlush(onFlush); + invoker.responseFile(parser.getValues(),httpHeader,strFile); }); } @@ -776,43 +764,137 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) return true; } -void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) { - _StrPrinter printer; - printer << "HTTP/1.1 " << pcStatus << "\r\n"; - for (auto &pr : header) { - printer << pr.first << ": " << pr.second << "\r\n"; - } - printer << "\r\n" << strContent; - auto strSend = printer << endl; - send(strSend); - _ticker.resetTime(); -} -HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { - KeyValue headerOut; +void HttpSession::sendResponse(const char *pcStatus, + bool bClose, + const char *pcContentType, + const HttpSession::KeyValue &header, + const HttpBody::Ptr &body, + bool set_content_len ){ + GET_CONFIG(string,charSet,Http::kCharSet); GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); - headerOut.emplace("Date", dateStr()); - headerOut.emplace("Server", SERVER_NAME); - headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); - if(!bClose){ - headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); - } - if(pcContentType){ - auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; - headerOut.emplace("Content-Type",strContentType); - } - if(iContentSize > 0){ - headerOut.emplace("Content-Length", StrPrinter<remainSize()) { + //有body,获取body大小 + size = body->remainSize(); + if (size >= INT64_MAX) { + //不固定长度的body,那么不设置content-length字段 + size = -1; + } + } + + if(!set_content_len || size == -1){ + //如果是不定长度body,或者不设置conten-length, + //那么一定是Keep-Alive类型 + bClose = false; + } + + HttpSession::KeyValue &headerOut = const_cast(header); + headerOut.emplace("Date", dateStr()); + headerOut.emplace("Server", SERVER_NAME); + headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); + if(!bClose){ + headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); + } if(!_origin.empty()){ + //设置跨域 headerOut.emplace("Access-Control-Allow-Origin",_origin); headerOut.emplace("Access-Control-Allow-Credentials", "true"); } - return headerOut; + + if(set_content_len && size >= 0){ + //文件长度为定值或者,且不是http-flv强制设置Content-Length + headerOut["Content-Length"] = StrPrinter << size << endl; + } + + if(size && !pcContentType){ + //有body时,设置缺省类型 + pcContentType = "text/plain"; + } + + if(size && pcContentType){ + //有body时,设置文件类型 + auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; + headerOut.emplace("Content-Type",strContentType); + } + + //发送http头 + _StrPrinter printer; + printer << "HTTP/1.1 " << pcStatus << "\r\n"; + for (auto &pr : header) { + printer << pr.first << ": " << pr.second << "\r\n"; + } + + printer << "\r\n"; + send(printer << endl); + _ticker.resetTime(); + + if(!size){ + //没有body + if(bClose){ + shutdown(SockException(Err_shutdown,"close connection after send http header completed")); + } + return; + } + + //发送http body + GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + + auto onFlush = [body,bClose,weakSelf]() { + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + //本对象已经销毁 + return false; + } + while(true){ + //更新超时计时器 + strongSelf->_ticker.resetTime(); + //读取文件 + auto sendBuf = body->readData(sendBufSize); + if (!sendBuf) { + //文件读完 + if(strongSelf->isSocketBusy() && bClose){ + //套接字忙,我们等待触发下一次onFlush事件 + //待所有数据flush到socket fd再移除onFlush事件监听 + //标记文件读写完毕 + return true; + } + //文件全部flush到socket fd,可以直接关闭socket了 + break; + } + + //文件还未读完 + if(strongSelf->send(sendBuf) == -1) { + //socket已经销毁,不再监听onFlush事件 + return false; + } + if(strongSelf->isSocketBusy()){ + //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket + return true; + } + //socket还可写,继续写socket + } + + if(bClose) { + //最后一次flush事件,文件也发送完毕了,可以关闭socket了 + strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed")); + } + //不再监听onFlush事件 + return false; + }; + + if(body->remainSize() > sendBufSize){ + //文件下载提升发送性能 + setSocketFlags(); + } + onFlush(); + _sock->setOnFlush(onFlush); } string HttpSession::urlDecode(const string &str){ @@ -841,20 +923,25 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); /////////////////////异步回复Invoker/////////////////////////////// weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){ + HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() { + strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() { auto strongSelf = weakSelf.lock(); if(!strongSelf) { + //本对象已经销毁 return; } - strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut); - if(bClose){ - strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); - } + + if(codeOut.empty()){ + //回调提供的参数异常 + strongSelf->sendNotFound(bClose); + return; + } + + strongSelf->sendResponse(codeOut.data(), bClose, nullptr, headerOut, body); }); }; ///////////////////广播HTTP事件/////////////////////////// @@ -862,7 +949,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); if(!consumed && doInvoke){ //该事件无人消费,所以返回404 - invoker("404 Not Found",KeyValue(),""); + invoker("404 Not Found",KeyValue(), HttpBody::Ptr()); if(bClose){ //close类型,回复完毕,关闭连接 shutdown(SockException(Err_shutdown,"404 Not Found")); @@ -943,25 +1030,10 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) { } //有后续content数据要处理,暂时不关闭连接 } -void HttpSession::responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut){ - if(codeOut.empty()){ - sendNotFound(bClose); - return; - } - auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain"); - for (auto &pr : headerOther){ - //添加默认http头,默认http头不能覆盖用户自定义的头 - const_cast(headerOut).emplace(pr.first,pr.second); - } - sendResponse(codeOut.data(), headerOut, contentOut); -} void HttpSession::sendNotFound(bool bClose) { GET_CONFIG(string,notFound,Http::kNotFound); - sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); + sendResponse("404 Not Found", bClose,"text/html",KeyValue(),std::make_shared(notFound)); } void HttpSession::setSocketFlags(){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index e6199668..98f234c3 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -36,21 +36,44 @@ #include "HttpRequestSplitter.h" #include "WebSocketSplitter.h" #include "HttpCookieManager.h" +#include "HttpBody.h" +#include "Util/function_traits.h" using namespace std; using namespace toolkit; namespace mediakit { +/** + * 该类实现与老代码的兼容适配 + */ +class HttpResponseInvokerImp{ +public: + typedef std::function HttpResponseInvokerLambda0; + typedef std::function HttpResponseInvokerLambda1; + + HttpResponseInvokerImp(){} + ~HttpResponseInvokerImp(){} + template + HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits::stl_function_type(c)) {} + HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda); + HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda); + + void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const; + void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const; + void responseFile(const StrCaseMap &requestHeader,const StrCaseMap &responseHeader,const string &filePath) const; + operator bool(); +private: + HttpResponseInvokerLambda0 _lambad; +}; + class HttpSession: public TcpSession, public FlvMuxer, public HttpRequestSplitter, public WebSocketSplitter { public: typedef StrCaseMap KeyValue; - typedef std::function HttpResponseInvoker; + typedef HttpResponseInvokerImp HttpResponseInvoker; /** * @param errMsg 如果为空,则代表鉴权通过,否则为错误提示 @@ -67,6 +90,7 @@ public: virtual void onManager() override; static string urlDecode(const string &str); + static const char* get_mime_type(const char* name); protected: //FlvMuxer override void onWrite(const Buffer::Ptr &data) override ; @@ -118,13 +142,9 @@ private: bool emitHttpEvent(bool doInvoke); void urlDecode(Parser &parser); void sendNotFound(bool bClose); - void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); - KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); - void responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut); - + void sendResponse(const char *pcStatus, bool bClose, const char *pcContentType = nullptr, + const HttpSession::KeyValue &header = HttpSession::KeyValue(), + const HttpBody::Ptr &body = nullptr,bool set_content_len = true); /** * 判断http客户端是否有权限访问文件的逻辑步骤 * @@ -160,6 +180,7 @@ private: //处理content数据的callback function _contentCallBack; bool _flv_over_websocket = false; + bool _is_flv_stream = false; }; diff --git a/src/MediaFile/HlsMaker.cpp b/src/MediaFile/HlsMaker.cpp index 959fcc36..3788f139 100644 --- a/src/MediaFile/HlsMaker.cpp +++ b/src/MediaFile/HlsMaker.cpp @@ -93,13 +93,14 @@ void HlsMaker::delOldSegment() { return; } //在hls m3u8索引文件中,我们保存的切片个数跟_seg_number相关设置一致 - if (_file_index >= _seg_number + 2) { + if (_file_index > _seg_number) { _seg_dur_list.pop_front(); } - //但是实际保存的切片个数比m3u8所述多两个,这样做的目的是防止播放器在切片删除前能下载完毕 - if (_file_index >= _seg_number + 4) { - onDelSegment(_file_index - _seg_number - 4); + GET_CONFIG(uint32_t,segRetain,Hls::kSegmentRetain); + //但是实际保存的切片个数比m3u8所述多若干个,这样做的目的是防止播放器在切片删除前能下载完毕 + if (_file_index > _seg_number + segRetain) { + onDelSegment(_file_index - _seg_number - segRetain - 1); } } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 2cd38528..e946d275 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -44,13 +44,17 @@ RtmpSession::~RtmpSession() { } void RtmpSession::onError(const SockException& err) { - WarnP(this) << err.what(); + bool isPlayer = !_pPublisherSrc; + WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - bool isPlayer = !_pPublisherSrc; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, @@ -194,7 +198,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { *this); if(!flag){ //该事件无人监听,默认鉴权成功 - onRes("",true,true,false); + GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); + GET_CONFIG(bool,toHls,General::kPublishToHls); + GET_CONFIG(bool,toMP4,General::kPublishToMP4); + onRes("",toRtxp,toHls,toMP4); } } diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index bd3b83b9..aa7dfce3 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -105,7 +105,7 @@ void RtspPlayer::play(const string &strUrl){ } void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType ) { - DebugL << strUrl << " " + DebugL << strUrl << " " << (strUser.size() ? strUser : "null") << " " << (strPwd.size() ? strPwd:"null") << " " << eType; @@ -122,7 +122,7 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co _eType = eType; auto ip = FindField(strUrl.data(), "://", "/"); - if (!ip.size()) { + if (ip.empty()) { ip = split(FindField(strUrl.data(), "://", NULL),"?")[0]; } auto port = atoi(FindField(ip.data(), ":", NULL).data()); @@ -134,6 +134,11 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co ip = FindField(ip.data(), NULL, ":"); } + if(ip.empty()){ + onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl)); + return; + } + _strUrl = strUrl; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -272,6 +277,13 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){ throw std::runtime_error("open rtcp sock failed"); } } + + if(rtpSockRef->get_local_port() % 2 != 0){ + //如果rtp端口不是偶数,那么与rtcp端口互换,目的是兼容一些要求严格的服务器 + Socket::Ptr tmp = rtpSockRef; + rtpSockRef = rtcpSockRef; + rtcpSockRef = tmp; + } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index efd0dc7e..14e2b89a 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -85,7 +85,13 @@ RtspSession::~RtspSession() { } void RtspSession::onError(const SockException& err) { - WarnP(this) << err.what(); + bool isPlayer = !_pushSrc; + WarnP(this) << (isPlayer ? "播放器(" : "推流器(") + << _mediaInfo._vhost << "/" + << _mediaInfo._app << "/" + << _mediaInfo._streamid + << ")断开:" << err.what(); + if (_rtpType == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); @@ -100,7 +106,6 @@ void RtspSession::onError(const SockException& err) { //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ - bool isPlayer = !_pushSrc; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, @@ -319,8 +324,11 @@ void RtspSession::handleReq_RECORD(const Parser &parser){ auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); if(!flag){ //该事件无人监听,默认不鉴权 - onRes("",true,true,false); - } + GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); + GET_CONFIG(bool,toHls,General::kPublishToHls); + GET_CONFIG(bool,toMP4,General::kPublishToMP4); + onRes("",toRtxp,toHls,toMP4); + } } void RtspSession::handleReq_Describe(const Parser &parser) {