Merge pull request #1 from xiongziliang/master

tb
This commit is contained in:
kqbi 2019-11-19 14:26:18 +08:00 committed by GitHub
commit fd0937dd41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 1254 additions and 607 deletions

2
.gitattributes vendored
View File

@ -1,4 +1,2 @@
release/ filter=lfs diff=lfs merge=lfs -text
*.a filter=lfs diff=lfs merge=lfs -text
*.h linguist-language=cpp *.h linguist-language=cpp
*.c linguist-language=cpp *.c linguist-language=cpp

25
.github/workflows/ccpp.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: C/C++ CI
on: [push]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: 下载submodule源码
run: git submodule update --init
- name: apt-get安装依赖库(非必选)
run: sudo apt-get install -y cmake libmysqlclient-dev libssl-dev libx264-dev libfaac-dev libmp4v2-dev libsdl-dev libavcodec-dev libavutil-dev
- name: 编译
run: mkdir -p linux_build && cd linux_build && cmake .. && make -j4
- name: 运行MediaServer
run: pwd && cd release/linux/Debug && sudo ./MediaServer -d &

@ -1 +1 @@
Subproject commit 4a6029b74b4f2339e32b8c546388de51e4ec1bcb Subproject commit 628d3b2527f63b54a5eb38b9e9973254d4a2192b

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -8,12 +8,11 @@
## Why ZLMediaKit? ## Why ZLMediaKit?
- Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise.
- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV`),and support Inter-protocol conversion. - Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion.
- Multiplexing asynchronous network IO based on epoll and multi threadextreme performance. - Multiplexing asynchronous network IO based on epoll and multi threadextreme performance.
- Well performance and stable test,can be used commercially. - Well performance and stable test,can be used commercially.
- Support linux, macos, ios, android, Windows Platforms. - Support linux, macos, ios, android, Windows Platforms.
- Very low latency(lower then one second), video opened immediately. - Very low latency(lower then one second), video opened immediately.
- **Now Support websocket-flv!**
## Features ## Features
@ -118,6 +117,12 @@
- Apple OSX(Darwin), both 32 and 64bits. - Apple OSX(Darwin), both 32 and 64bits.
- All hardware with x86/x86_64/arm/mips cpu. - All hardware with x86/x86_64/arm/mips cpu.
- Windows. - Windows.
## How to build
It is recommended to compile on Ubuntu or MacOScompiling on windows is cumbersome, and some features are not compiled by default.
### Before build
- **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:** - **You must use git to clone the complete code. Do not download the source code by downloading zip package. Otherwise, the sub-module code will not be downloaded by default.You can do it like this:**
``` ```
git clone https://github.com/zlmediakit/ZLMediaKit.git git clone https://github.com/zlmediakit/ZLMediaKit.git
@ -125,12 +130,6 @@ cd ZLMediaKit
git submodule update --init git submodule update --init
``` ```
## How to build
It is recommended to compile on Ubuntu or MacOScompiling on windows is cumbersome, and some features are not compiled by default.
### Build on linux ### Build on linux
- My environment - My environment
@ -307,6 +306,17 @@ It is recommended to compile on Ubuntu or MacOScompiling on windows is cumber
}); });
``` ```
## Docker Image
You can pull a pre-built docker image from Docker Hub and run with
```bash
docker run -id -p 1935:1935 -p 8080:80 gemfield/zlmediakit
```
Dockerfile is also supplied to build images on Ubuntu 16.04
```bash
cd docker
docker build -t zlmediakit .
```
## Mirrors ## Mirrors

View File

@ -4,14 +4,13 @@
## 项目特点 ## 项目特点
- 基于C++11开发避免使用裸指针代码稳定可靠同时跨平台移植简单方便代码清晰简洁。 - 基于C++11开发避免使用裸指针代码稳定可靠同时跨平台移植简单方便代码清晰简洁。
- 打包多种流媒体协议(RTSP/RTMP/HLS支持协议间的互相转换提供一站式的服务。 - 打包多种流媒体协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV),支持协议间的互相转换,提供一站式的服务。
- 使用epoll+线程池+异步网络IO模式开发并发性能优越。 - 使用epoll+线程池+异步网络IO模式开发并发性能优越。
- 已实现主流的的H264/H265+AAC流媒体方案代码精简,脉络清晰,适合学习。 - 已实现主流的的H264/H265+AAC流媒体方案代码精简,脉络清晰,适合学习。
- 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式 - 编码格式与框架代码解耦,方便自由简洁的添加支持其他编码格式
- 代码经过大量的稳定性、性能测试,可满足商用服务器项目。 - 代码经过大量的稳定性、性能测试,可满足商用服务器项目。
- 支持linux、macos、ios、android、windows平台 - 支持linux、macos、ios、android、windows平台
- 支持画面秒开(GOP缓存)、极低延时([500毫秒内最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95)) - 支持画面秒开(GOP缓存)、极低延时([500毫秒内最低可达100毫秒](https://github.com/zlmediakit/ZLMediaKit/wiki/%E5%BB%B6%E6%97%B6%E6%B5%8B%E8%AF%95))
- **支持websocket-flv直播**
- [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86) - [ZLMediaKit高并发实现原理](https://github.com/xiongziliang/ZLMediaKit/wiki/ZLMediaKit%E9%AB%98%E5%B9%B6%E5%8F%91%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86)
## 项目定位 ## 项目定位
@ -129,6 +128,9 @@
## 编译要求 ## 编译要求
- 编译器支持C++11GCC4.8/Clang3.3/VC2015或以上 - 编译器支持C++11GCC4.8/Clang3.3/VC2015或以上
- cmake3.2或以上 - cmake3.2或以上
## 编译前必看!!!
- **必须使用git下载完整的代码不要使用下载zip包的方式下载源码否则子模块代码默认不下载你可以像以下这样操作:** - **必须使用git下载完整的代码不要使用下载zip包的方式下载源码否则子模块代码默认不下载你可以像以下这样操作:**
``` ```
git clone https://github.com/zlmediakit/ZLMediaKit.git git clone https://github.com/zlmediakit/ZLMediaKit.git

View File

@ -9,7 +9,7 @@ secret=035c73f7-bb6b-4889-a715-d9eb2d1925cc
#FFmpeg可执行程序绝对路径 #FFmpeg可执行程序绝对路径
bin=/usr/local/bin/ffmpeg bin=/usr/local/bin/ffmpeg
#FFmpeg拉流再推流的命令模板通过该模板可以设置再编码的一些参数 #FFmpeg拉流再推流的命令模板通过该模板可以设置再编码的一些参数
cmd=%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s cmd=%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s
#FFmpeg日志的路径如果置空则不生成FFmpeg日志 #FFmpeg日志的路径如果置空则不生成FFmpeg日志
#可以为相对(相对于本可执行程序目录)或绝对路径 #可以为相对(相对于本可执行程序目录)或绝对路径
log=./ffmpeg/ffmpeg.log log=./ffmpeg/ffmpeg.log
@ -36,6 +36,12 @@ addMuteAudio=1
#拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, #拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
#如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) #如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
resetWhenRePlay=1 resetWhenRePlay=1
#是否默认推流时转换成rtsp或rtmphook接口(on_publish)中可以覆盖该设置
publishToRtxp=1
#是否默认推流时转换成hlshook接口(on_publish)中可以覆盖该设置
publishToHls=1
#是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置
publishToMP4=0
[hls] [hls]
#hls写文件的buf大小调整参数可以提高文件io性能 #hls写文件的buf大小调整参数可以提高文件io性能
@ -44,10 +50,12 @@ fileBufSize=65536
#可以为相对(相对于本可执行程序目录)或绝对路径 #可以为相对(相对于本可执行程序目录)或绝对路径
filePath=./httpRoot filePath=./httpRoot
#hls最大切片时间 #hls最大切片时间
segDur=3 segDur=2
#m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个) #m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个)
#如果设置为0则不删除切片而是保存为点播 #如果设置为0则不删除切片而是保存为点播
segNum=3 segNum=3
#HLS切片从m3u8文件中移除后继续保留在磁盘上的个数
segRetain=5
[hook] [hook]
#在推流时如果url参数匹对admin_params那么可以不经过hook鉴权直接推流成功播放时亦然 #在推流时如果url参数匹对admin_params那么可以不经过hook鉴权直接推流成功播放时亦然

36
docker/Dockerfile Normal file
View File

@ -0,0 +1,36 @@
FROM ubuntu:16.04
#shell,rtmp,rtsp,rtsps,http,https
EXPOSE 9000/tcp
EXPOSE 1935/tcp
EXPOSE 554/tcp
EXPOSE 322/tcp
EXPOSE 80/tcp
EXPOSE 443/tcp
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
cmake \
git \
curl \
ca-certificates \
libssl-dev \
libmysqlclient-dev \
libx264-dev \
libfaac-dev \
libmp4v2-dev && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/media
WORKDIR /opt/media
RUN git clone --depth=1 https://github.com/xiongziliang/ZLMediaKit && \
cd ZLMediaKit && git submodule update --init --recursive && \
mkdir -p build release/linux/Release/
WORKDIR /opt/media/ZLMediaKit/build
RUN cmake -DCMAKE_BUILD_TYPE=Release .. && \
make -j4
ENV PATH /opt/media/ZLMediaKit/release/linux/Release/:$PATH
CMD MediaServer

View File

@ -38,7 +38,7 @@ const char kLog[] = FFmpeg_FIELD"log";
onceToken token([]() { onceToken token([]() {
mINI::Instance()[kBin] = trim(System::execute("which ffmpeg")); mINI::Instance()[kBin] = trim(System::execute("which ffmpeg"));
mINI::Instance()[kCmd] = "%s -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"; mINI::Instance()[kCmd] = "%s -re -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s";
mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log"; mINI::Instance()[kLog] = "./ffmpeg/ffmpeg.log";
}); });
} }
@ -48,7 +48,6 @@ FFmpegSource::FFmpegSource() {
} }
FFmpegSource::~FFmpegSource() { FFmpegSource::~FFmpegSource() {
NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader);
DebugL; DebugL;
} }
@ -83,6 +82,7 @@ void FFmpegSource::play(const string &src_url,const string &dst_url,int timeout_
if(src){ if(src){
//推流给自己成功 //推流给自己成功
cb(SockException()); cb(SockException());
strongSelf->onGetMediaSource(src);
strongSelf->startTimer(timeout_ms); strongSelf->startTimer(timeout_ms);
return; return;
} }
@ -192,8 +192,7 @@ void FFmpegSource::startTimer(int timeout_ms) {
//同步查找流 //同步查找流
if (!src) { if (!src) {
//流不在线,重新拉流 //流不在线,重新拉流
strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {});
[](const SockException &) {});
} }
}); });
} else { } else {
@ -205,29 +204,35 @@ void FFmpegSource::startTimer(int timeout_ms) {
} }
return true; return true;
}, _poller); }, _poller);
NoticeCenter::Instance().delListener(this, Broadcast::kBroadcastStreamNoneReader);
NoticeCenter::Instance().addListener(this, Broadcast::kBroadcastStreamNoneReader,[weakSelf](BroadcastStreamNoneReaderArgs) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//自身已经销毁
return;
}
if(sender.getVhost() != strongSelf->_media_info._vhost ||
sender.getApp() != strongSelf->_media_info._app ||
sender.getId() != strongSelf->_media_info._streamid){
//不是自己感兴趣的事件,忽略之
return;
}
//该流无人观看,我们停止吧
if(strongSelf->_onClose){
strongSelf->_onClose();
}
});
} }
void FFmpegSource::setOnClose(const function<void()> &cb){ void FFmpegSource::setOnClose(const function<void()> &cb){
_onClose = cb; _onClose = cb;
} }
bool FFmpegSource::close(MediaSource &sender, bool force) {
auto listener = _listener.lock();
if(listener && !listener->close(sender,force)){
//关闭失败
return false;
}
//该流无人观看,我们停止吧
if(_onClose){
_onClose();
}
return true;
}
void FFmpegSource::onNoneReader(MediaSource &sender) {
auto listener = _listener.lock();
if(listener){
listener->onNoneReader(sender);
}else{
MediaSourceEvent::onNoneReader(sender);
}
}
void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
_listener = src->getListener();
src->setListener(shared_from_this());
}

View File

@ -39,7 +39,7 @@ using namespace std;
using namespace toolkit; using namespace toolkit;
using namespace mediakit; using namespace mediakit;
class FFmpegSource : public std::enable_shared_from_this<FFmpegSource>{ class FFmpegSource : public std::enable_shared_from_this<FFmpegSource> , public MediaSourceEvent{
public: public:
typedef shared_ptr<FFmpegSource> Ptr; typedef shared_ptr<FFmpegSource> Ptr;
typedef function<void(const SockException &ex)> onPlay; typedef function<void(const SockException &ex)> onPlay;
@ -55,6 +55,10 @@ public:
private: private:
void findAsync(int maxWaitMS ,const function<void(const MediaSource::Ptr &src)> &cb); void findAsync(int maxWaitMS ,const function<void(const MediaSource::Ptr &src)> &cb);
void startTimer(int timeout_ms); void startTimer(int timeout_ms);
void onGetMediaSource(const MediaSource::Ptr &src);
bool close(MediaSource &sender,bool force) override;
void onNoneReader(MediaSource &sender) override ;
private: private:
Process _process; Process _process;
Timer::Ptr _timer; Timer::Ptr _timer;
@ -63,6 +67,7 @@ private:
string _src_url; string _src_url;
string _dst_url; string _dst_url;
function<void()> _onClose; function<void()> _onClose;
std::weak_ptr<MediaSourceEvent> _listener;
}; };

View File

@ -34,9 +34,8 @@
#include "Util/File.h" #include "Util/File.h"
#include "Util/logger.h" #include "Util/logger.h"
#include "Util/uv_errno.h" #include "Util/uv_errno.h"
#include "Util/TimeTicker.h" #include "Thread/WorkThreadPool.h"
#include "Process.h" #include "Process.h"
#include "Poller/Timer.h"
using namespace toolkit; using namespace toolkit;
void Process::run(const string &cmd, const string &log_file_tmp) { void Process::run(const string &cmd, const string &log_file_tmp) {
@ -46,12 +45,11 @@ void Process::run(const string &cmd, const string &log_file_tmp) {
throw std::runtime_error(StrPrinter << "fork child process falied,err:" << get_uv_errmsg()); throw std::runtime_error(StrPrinter << "fork child process falied,err:" << get_uv_errmsg());
} }
if (_pid == 0) { if (_pid == 0) {
//子进程
//子进程关闭core文件生成 //子进程关闭core文件生成
struct rlimit rlim = {0,0}; struct rlimit rlim = {0,0};
setrlimit(RLIMIT_CORE, &rlim); setrlimit(RLIMIT_CORE, &rlim);
//在启动子进程时暂时禁用SIGINT、SIGTERM信号
// ignore the SIGINT and SIGTERM // ignore the SIGINT and SIGTERM
signal(SIGINT, SIG_IGN); signal(SIGINT, SIG_IGN);
signal(SIGTERM, SIG_IGN); signal(SIGTERM, SIG_IGN);
@ -109,24 +107,73 @@ void Process::run(const string &cmd, const string &log_file_tmp) {
InfoL << "start child proces " << _pid; InfoL << "start child proces " << _pid;
} }
void Process::kill(int max_delay) {
/**
*
* @param pid
* @param exit_code_ptr
* @param block
* @return
*/
static bool s_wait(pid_t pid,int *exit_code_ptr,bool block) {
if (pid <= 0) {
return false;
}
int status = 0;
pid_t p = waitpid(pid, &status, block ? 0 : WNOHANG);
int exit_code = (status & 0xFF00) >> 8;
if(exit_code_ptr){
*exit_code_ptr = (status & 0xFF00) >> 8;
}
if (p < 0) {
WarnL << "waitpid failed, pid=" << pid << ", err=" << get_uv_errmsg();
return false;
}
if (p > 0) {
InfoL << "process terminated, pid=" << pid << ", exit code=" << exit_code;
return false;
}
//WarnL << "process is running, pid=" << _pid;
return true;
}
static void s_kill(pid_t pid,int max_delay,bool force){
if (pid <= 0) {
//pid无效
return;
}
if (::kill(pid, force ? SIGKILL : SIGTERM) == -1) {
//进程可能已经退出了
WarnL << "kill process " << pid << " failed:" << get_uv_errmsg();
return;
}
if(force){
//发送SIGKILL信号后阻塞等待退出
s_wait(pid, NULL, true);
DebugL << "force kill " << pid << " success!";
return;
}
//发送SIGTERM信号后2秒后检查子进程是否已经退出
WorkThreadPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){
if (!s_wait(pid, nullptr, false)) {
//进程已经退出了
return 0;
}
//进程还在运行
WarnL << "process still working,force kill it:" << pid;
s_kill(pid,0, true);
return 0;
});
}
void Process::kill(int max_delay,bool force) {
if (_pid <= 0) { if (_pid <= 0) {
return; return;
} }
if (::kill(_pid, SIGTERM) == -1) { s_kill(_pid,max_delay,force);
WarnL << "kill process " << _pid << " falied,err:" << get_uv_errmsg();
} else {
//等待子进程退出
auto pid = _pid;
EventPollerPool::Instance().getPoller()->doDelayTask(max_delay,[pid](){
//最多等待2秒,2秒后强制杀掉程序
if (waitpid(pid, NULL, WNOHANG) == 0) {
::kill(pid, SIGKILL);
WarnL << "force kill process " << pid;
}
return 0;
});
}
_pid = -1; _pid = -1;
} }
@ -134,28 +181,10 @@ Process::~Process() {
kill(2000); kill(2000);
} }
Process::Process() { Process::Process() {}
}
bool Process::wait(bool block) { bool Process::wait(bool block) {
if (_pid <= 0) { return s_wait(_pid,&_exit_code,block);
return false;
}
int status = 0;
pid_t p = waitpid(_pid, &status, block ? 0 : WNOHANG);
_exit_code = (status & 0xFF00) >> 8;
if (p < 0) {
WarnL << "waitpid failed, pid=" << _pid << ", err=" << get_uv_errmsg();
return false;
}
if (p > 0) {
InfoL << "process terminated, pid=" << _pid << ", exit code=" << _exit_code;
return false;
}
//WarnL << "process is running, pid=" << _pid;
return true;
} }
int Process::exit_code() { int Process::exit_code() {

View File

@ -36,7 +36,7 @@ public:
Process(); Process();
~Process(); ~Process();
void run(const string &cmd,const string &log_file); void run(const string &cmd,const string &log_file);
void kill(int max_delay); void kill(int max_delay,bool force = false);
bool wait(bool block = true); bool wait(bool block = true);
int exit_code(); int exit_code();
private: private:

View File

@ -45,6 +45,7 @@
#include "Util/MD5.h" #include "Util/MD5.h"
#include "WebApi.h" #include "WebApi.h"
#include "WebHook.h" #include "WebHook.h"
#include "Thread/WorkThreadPool.h"
#if !defined(_WIN32) #if !defined(_WIN32)
#include "FFmpegSource.h" #include "FFmpegSource.h"
@ -71,6 +72,8 @@ typedef map<string,variant,StrCaseCompare> ApiArgsType;
invoker("200 OK", headerOut, val.toStyledString()); \ invoker("200 OK", headerOut, val.toStyledString()); \
}); });
#define API_ARGS_VALUE sender,headerIn,headerOut,allArgs,val,invoker
#define API_REGIST_INVOKER(field, name, ...) \ #define API_REGIST_INVOKER(field, name, ...) \
s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker) __VA_ARGS__); s_map_api.emplace("/index/"#field"/"#name,[](API_ARGS,const HttpSession::HttpResponseInvoker &invoker) __VA_ARGS__);
@ -180,19 +183,35 @@ static inline void addHttpListener(){
if(api_debug){ if(api_debug){
auto newInvoker = [invoker,parser,allArgs](const string &codeOut, auto newInvoker = [invoker,parser,allArgs](const string &codeOut,
const HttpSession::KeyValue &headerOut, const HttpSession::KeyValue &headerOut,
const string &contentOut){ const HttpBody::Ptr &body){
stringstream ss; stringstream ss;
for(auto &pr : allArgs ){ for(auto &pr : allArgs ){
ss << pr.first << " : " << pr.second << "\r\n"; ss << pr.first << " : " << pr.second << "\r\n";
} }
//body默认为空
int64_t size = 0;
if (body && body->remainSize()) {
//有body获取body大小
size = body->remainSize();
}
if(size < 4 * 1024){
string contentOut = body->readData(size)->toString();
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n" DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
<< "# content:\r\n" << parser.Content() << "\r\n" << "# content:\r\n" << parser.Content() << "\r\n"
<< "# args:\r\n" << ss.str() << "# args:\r\n" << ss.str()
<< "# response:\r\n" << "# response:\r\n"
<< contentOut << "\r\n"; << contentOut << "\r\n";
invoker(codeOut,headerOut,contentOut); invoker(codeOut,headerOut,contentOut);
} else{
DebugL << "\r\n# request:\r\n" << parser.Method() << " " << parser.FullUrl() << "\r\n"
<< "# content:\r\n" << parser.Content() << "\r\n"
<< "# args:\r\n" << ss.str()
<< "# response size:"
<< size <<"\r\n";
invoker(codeOut,headerOut,body);
}
}; };
((HttpSession::HttpResponseInvoker &)invoker) = newInvoker; ((HttpSession::HttpResponseInvoker &)invoker) = newInvoker;
} }
@ -277,6 +296,25 @@ void installWebApi() {
obj["delay"] = vecDelay[i++]; obj["delay"] = vecDelay[i++];
val["data"].append(obj); val["data"].append(obj);
} }
val["code"] = API::Success;
invoker("200 OK", headerOut, val.toStyledString());
});
});
//获取后台工作线程负载
//测试url http://127.0.0.1/index/api/getWorkThreadsLoad
API_REGIST_INVOKER(api, getWorkThreadsLoad, {
WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector<int> &vecDelay) {
Value val;
auto vec = WorkThreadPool::Instance().getExecutorLoad();
int i = 0;
for (auto load : vec) {
Value obj(objectValue);
obj["load"] = load;
obj["delay"] = vecDelay[i++];
val["data"].append(obj);
}
val["code"] = API::Success;
invoker("200 OK", headerOut, val.toStyledString()); invoker("200 OK", headerOut, val.toStyledString());
}); });
}); });
@ -313,7 +351,7 @@ void installWebApi() {
} }
if (changed > 0) { if (changed > 0) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastReloadConfig);
ini.dumpFile(); ini.dumpFile(g_ini_file);
} }
val["changed"] = changed; val["changed"] = changed;
}); });
@ -357,8 +395,6 @@ void installWebApi() {
API_REGIST(api,getMediaList,{ API_REGIST(api,getMediaList,{
CHECK_SECRET(); CHECK_SECRET();
//获取所有MediaSource列表 //获取所有MediaSource列表
val["code"] = API::Success;
val["msg"] = "success";
MediaSource::for_each_media([&](const string &schema, MediaSource::for_each_media([&](const string &schema,
const string &vhost, const string &vhost,
const string &app, const string &app,
@ -382,6 +418,13 @@ void installWebApi() {
}); });
}); });
//测试url http://127.0.0.1/index/api/isMediaOnline?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs
API_REGIST(api,isMediaOnline,{
CHECK_SECRET();
CHECK_ARGS("schema","vhost","app","stream");
val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"],false));
});
//主动关断流,包括关断拉流、推流 //主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
API_REGIST(api,close_stream,{ API_REGIST(api,close_stream,{
@ -394,14 +437,53 @@ void installWebApi() {
allArgs["stream"]); allArgs["stream"]);
if(src){ if(src){
bool flag = src->close(allArgs["force"].as<bool>()); bool flag = src->close(allArgs["force"].as<bool>());
val["code"] = flag ? 0 : -1; val["result"] = flag ? 0 : -1;
val["msg"] = flag ? "success" : "close failed"; val["msg"] = flag ? "success" : "close failed";
}else{ }else{
val["code"] = -2; val["result"] = -2;
val["msg"] = "can not find the stream"; val["msg"] = "can not find the stream";
} }
}); });
//批量主动关断流,包括关断拉流、推流
//测试url http://127.0.0.1/index/api/close_streams?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1
API_REGIST(api,close_streams,{
CHECK_SECRET();
//筛选命中个数
int count_hit = 0;
int count_closed = 0;
list<MediaSource::Ptr> media_list;
MediaSource::for_each_media([&](const string &schema,
const string &vhost,
const string &app,
const string &stream,
const MediaSource::Ptr &media){
if(!allArgs["schema"].empty() && allArgs["schema"] != schema){
return;
}
if(!allArgs["vhost"].empty() && allArgs["vhost"] != vhost){
return;
}
if(!allArgs["app"].empty() && allArgs["app"] != app){
return;
}
if(!allArgs["stream"].empty() && allArgs["stream"] != stream){
return;
}
++count_hit;
media_list.emplace_back(media);
});
bool force = allArgs["force"].as<bool>();
for(auto &media : media_list){
if(media->close(force)){
++count_closed;
}
}
val["count_hit"] = count_hit;
val["count_closed"] = count_closed;
});
//获取所有TcpSession列表信息 //获取所有TcpSession列表信息
//可以根据本地端口和远端ip来筛选 //可以根据本地端口和远端ip来筛选
//测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935 //测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935
@ -412,7 +494,7 @@ void installWebApi() {
string &peer_ip = allArgs["peer_ip"]; string &peer_ip = allArgs["peer_ip"];
SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){ SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){
if(local_port != API::Success && local_port != session->get_local_port()){ if(local_port != 0 && local_port != session->get_local_port()){
return; return;
} }
if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){ if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){
@ -436,13 +518,36 @@ void installWebApi() {
//踢掉tcp会话 //踢掉tcp会话
auto session = SessionMap::Instance().get(allArgs["id"]); auto session = SessionMap::Instance().get(allArgs["id"]);
if(!session){ if(!session){
val["code"] = API::OtherFailed; throw ApiRetException("can not find the target",API::OtherFailed);
val["msg"] = "can not find the target";
return;
} }
session->safeShutdown(); session->safeShutdown();
val["code"] = API::Success; });
val["msg"] = "success";
//批量断开tcp连接比如说可以断开rtsp、rtmp播放器等
//测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935
API_REGIST(api,kick_sessions,{
CHECK_SECRET();
uint16_t local_port = allArgs["local_port"].as<uint16_t>();
string &peer_ip = allArgs["peer_ip"];
uint64_t count_hit = 0;
list<TcpSession::Ptr> session_list;
SessionMap::Instance().for_each_session([&](const string &id,const TcpSession::Ptr &session){
if(local_port != 0 && local_port != session->get_local_port()){
return;
}
if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){
return;
}
session_list.emplace_back(session);
++count_hit;
});
for(auto &session : session_list){
session->safeShutdown();
}
val["count_hit"] = (Json::UInt64)count_hit;
}); });
static auto addStreamProxy = [](const string &vhost, static auto addStreamProxy = [](const string &vhost,
@ -520,7 +625,7 @@ void installWebApi() {
}); });
#if !defined(_WIN32) #if !defined(_WIN32)
static auto addFFmepgSource = [](const string &src_url, static auto addFFmpegSource = [](const string &src_url,
const string &dst_url, const string &dst_url,
int timeout_ms, int timeout_ms,
const function<void(const SockException &ex,const string &key)> &cb){ const function<void(const SockException &ex,const string &key)> &cb){
@ -557,7 +662,7 @@ void installWebApi() {
auto dst_url = allArgs["dst_url"]; auto dst_url = allArgs["dst_url"];
int timeout_ms = allArgs["timeout_ms"]; int timeout_ms = allArgs["timeout_ms"];
addFFmepgSource(src_url,dst_url,timeout_ms,[invoker,val,headerOut](const SockException &ex,const string &key){ addFFmpegSource(src_url,dst_url,timeout_ms,[invoker,val,headerOut](const SockException &ex,const string &key){
if(ex){ if(ex){
const_cast<Value &>(val)["code"] = API::OtherFailed; const_cast<Value &>(val)["code"] = API::OtherFailed;
const_cast<Value &>(val)["msg"] = ex.what(); const_cast<Value &>(val)["msg"] = ex.what();
@ -568,16 +673,33 @@ void installWebApi() {
}); });
}); });
//关闭拉流代理
//测试url http://127.0.0.1/index/api/delFFmepgSource?key=key static auto api_delFFmpegSource = [](API_ARGS,const HttpSession::HttpResponseInvoker &invoker){
API_REGIST(api,delFFmepgSource,{
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx);
val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1; val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1;
};
//关闭拉流代理
//测试url http://127.0.0.1/index/api/delFFmepgSource?key=key
API_REGIST(api,delFFmpegSource,{
api_delFFmpegSource(API_ARGS_VALUE);
});
//此处为了兼容之前的拼写错误
API_REGIST(api,delFFmepgSource,{
api_delFFmpegSource(API_ARGS_VALUE);
}); });
#endif #endif
//新增http api下载可执行程序文件接口
//测试url http://127.0.0.1/index/api/downloadBin
API_REGIST_INVOKER(api,downloadBin,{
CHECK_SECRET();
invoker.responseFile(headerIn,StrCaseMap(),exePath());
});
////////////以下是注册的Hook API//////////// ////////////以下是注册的Hook API////////////
API_REGIST(hook,on_publish,{ API_REGIST(hook,on_publish,{
//开始推流事件 //开始推流事件
@ -636,7 +758,7 @@ void installWebApi() {
<< allArgs["stream"] << "?vhost=" << allArgs["stream"] << "?vhost="
<< allArgs["vhost"]; << allArgs["vhost"];
addFFmepgSource("http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8",/** ffmpeg拉流支持任意编码格式任意协议 **/ addFFmpegSource("http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8",/** ffmpeg拉流支持任意编码格式任意协议 **/
dst_url, dst_url,
(1000 * timeout_sec) - 500, (1000 * timeout_sec) - 500,
[invoker,val,headerOut](const SockException &ex,const string &key){ [invoker,val,headerOut](const SockException &ex,const string &key){

View File

@ -47,5 +47,7 @@ extern const string kPort;
void installWebApi(); void installWebApi();
void unInstallWebApi(); void unInstallWebApi();
//配置文件路径
extern string g_ini_file;
#endif //ZLMEDIAKIT_WEBAPI_H #endif //ZLMEDIAKIT_WEBAPI_H

View File

@ -195,7 +195,10 @@ void installWebHook(){
NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){
if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){ if(!hook_enable || args._param_strs == hook_adminparams || hook_publish.empty() || sender.get_peer_ip() == "127.0.0.1"){
invoker("",true, true,false); GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
invoker("",toRtxp,toHls,toMP4);
return; return;
} }
//异步执行该hook api防止阻塞NoticeCenter //异步执行该hook api防止阻塞NoticeCenter

View File

@ -148,7 +148,7 @@ public:
Option::ArgRequired,/*该选项后面必须跟值*/ Option::ArgRequired,/*该选项后面必须跟值*/
(exeDir() + "ssl.p12").data(),/*该选项默认值*/ (exeDir() + "ssl.p12").data(),/*该选项默认值*/
false,/*该选项是否必须赋值如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/ false,/*该选项是否必须赋值如果没有默认值且为ArgRequired时用户必须提供该参数否则将抛异常*/
"ssl证书路径,支持p12/pem类型",/*该选项说明文字*/ "ssl证书文件或文件夹,支持p12/pem类型",/*该选项说明文字*/
nullptr); nullptr);
(*_parser) << Option('t',/*该选项简称,如果是\x00则说明无简称*/ (*_parser) << Option('t',/*该选项简称,如果是\x00则说明无简称*/
@ -205,6 +205,8 @@ static void inline listen_shell_input(){
} }
#endif//!defined(_WIN32) #endif//!defined(_WIN32)
//全局变量在WebApi中用于保存配置文件用
string g_ini_file;
int start_main(int argc,char *argv[]) { int start_main(int argc,char *argv[]) {
{ {
@ -219,7 +221,7 @@ int start_main(int argc,char *argv[]) {
bool bDaemon = cmd_main.hasKey("daemon"); bool bDaemon = cmd_main.hasKey("daemon");
LogLevel logLevel = (LogLevel) cmd_main["level"].as<int>(); LogLevel logLevel = (LogLevel) cmd_main["level"].as<int>();
logLevel = MIN(MAX(logLevel, LTrace), LError); logLevel = MIN(MAX(logLevel, LTrace), LError);
static string ini_file = cmd_main["config"]; g_ini_file = cmd_main["config"];
string ssl_file = cmd_main["ssl"]; string ssl_file = cmd_main["ssl"];
int threads = cmd_main["threads"]; int threads = cmd_main["threads"];
@ -244,14 +246,21 @@ int start_main(int argc,char *argv[]) {
//启动异步日志线程 //启动异步日志线程
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>()); Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
//加载配置文件,如果配置文件不存在就创建一个 //加载配置文件,如果配置文件不存在就创建一个
loadIniConfig(ini_file.data()); loadIniConfig(g_ini_file.data());
//加载证书,证书包含公钥和私钥 if(!File::is_dir(ssl_file.data())){
//不是文件夹,加载证书,证书包含公钥和私钥
SSL_Initor::Instance().loadCertificate(ssl_file.data()); SSL_Initor::Instance().loadCertificate(ssl_file.data());
//信任某个自签名证书 }else{
SSL_Initor::Instance().trustCertificate(ssl_file.data()); //加载文件夹下的所有证书
//不忽略无效证书证书(例如自签名或过期证书) File::scanDir(ssl_file,[](const string &path, bool isDir){
SSL_Initor::Instance().ignoreInvalidCertificate(true); if(!isDir){
//最后的一个证书会当做默认证书(客户端ssl握手时未指定主机)
SSL_Initor::Instance().loadCertificate(path.data());
}
return true;
});
}
uint16_t shellPort = mINI::Instance()[Shell::kPort]; uint16_t shellPort = mINI::Instance()[Shell::kPort];
uint16_t rtspPort = mINI::Instance()[Rtsp::kPort]; uint16_t rtspPort = mINI::Instance()[Rtsp::kPort];
@ -306,7 +315,7 @@ int start_main(int argc,char *argv[]) {
});// 设置退出信号 });// 设置退出信号
#if !defined(_WIN32) #if !defined(_WIN32)
signal(SIGHUP, [](int) { mediakit::loadIniConfig(ini_file.data()); }); signal(SIGHUP, [](int) { mediakit::loadIniConfig(g_ini_file.data()); });
#endif #endif
sem.wait(); sem.wait();
} }

View File

@ -32,6 +32,7 @@
#include "Util/TimeTicker.h" #include "Util/TimeTicker.h"
#include "Extension/AAC.h" #include "Extension/AAC.h"
#include "Extension/H264.h" #include "Extension/H264.h"
#include "Extension/H265.h"
using namespace toolkit; using namespace toolkit;
@ -106,6 +107,24 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32
inputFrame(std::make_shared<H264FrameNoCacheAble>((char *)pcData,iDataLen,dts,pts,prefixeSize)); inputFrame(std::make_shared<H264FrameNoCacheAble>((char *)pcData,iDataLen,dts,pts,prefixeSize));
} }
void DevChannel::inputH265(const char* pcData, int iDataLen, uint32_t dts,uint32_t pts) {
if(dts == 0){
dts = (uint32_t)_aTicker[0].elapsedTime();
}
if(pts == 0){
pts = dts;
}
int prefixeSize;
if (memcmp("\x00\x00\x00\x01", pcData, 4) == 0) {
prefixeSize = 4;
} else if (memcmp("\x00\x00\x01", pcData, 3) == 0) {
prefixeSize = 3;
} else {
prefixeSize = 0;
}
inputFrame(std::make_shared<H265FrameNoCacheAble>((char *)pcData,iDataLen,dts,pts,prefixeSize));
}
void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) { void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) {
if(withAdtsHeader){ if(withAdtsHeader){
inputAAC(pcData+7,iDataLen-7,uiStamp,pcData); inputAAC(pcData+7,iDataLen-7,uiStamp,pcData);
@ -135,6 +154,11 @@ void DevChannel::initVideo(const VideoInfo& info) {
addTrack(std::make_shared<H264Track>()); addTrack(std::make_shared<H264Track>());
} }
void DevChannel::initH265Video(const VideoInfo &info){
_video = std::make_shared<VideoInfo>(info);
addTrack(std::make_shared<H265Track>());
}
void DevChannel::initAudio(const AudioInfo& info) { void DevChannel::initAudio(const AudioInfo& info) {
_audio = std::make_shared<AudioInfo>(info); _audio = std::make_shared<AudioInfo>(info);
addTrack(std::make_shared<AACTrack>()); addTrack(std::make_shared<AACTrack>());

View File

@ -88,6 +88,12 @@ public:
*/ */
void initVideo(const VideoInfo &info); void initVideo(const VideoInfo &info);
/**
* h265视频Track
* @param info
*/
void initH265Video(const VideoInfo &info);
/** /**
* aac音频Track * aac音频Track
* MultiMediaSourceMuxer::addTrack(AACTrack::Ptr ); * MultiMediaSourceMuxer::addTrack(AACTrack::Ptr );
@ -104,6 +110,15 @@ public:
*/ */
void inputH264(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0); void inputH264(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0);
/**
* 265
* @param pcData 265
* @param iDataLen
* @param dts 0
* @param pts 0dts
*/
void inputH265(const char *pcData, int iDataLen, uint32_t dts,uint32_t pts = 0);
/** /**
* adts头的aac帧 * adts头的aac帧
* @param pcDataWithAdts adts头的aac帧 * @param pcDataWithAdts adts头的aac帧

View File

@ -167,10 +167,15 @@ public:
} }
listener->onNoneReader(*this); listener->onNoneReader(*this);
} }
virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){ virtual void setListener(const std::weak_ptr<MediaSourceEvent> &listener){
_listener = listener; _listener = listener;
} }
std::weak_ptr<MediaSourceEvent> getListener(){
return _listener;
}
template <typename FUN> template <typename FUN>
static void for_each_media(FUN && fun){ static void for_each_media(FUN && fun){
lock_guard<recursive_mutex> lock(g_mtxMediaSrc); lock_guard<recursive_mutex> lock(g_mtxMediaSrc);

View File

@ -79,6 +79,9 @@ const string kEnableVhost = GENERAL_FIELD"enableVhost";
const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay"; const string kUltraLowDelay = GENERAL_FIELD"ultraLowDelay";
const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio"; const string kAddMuteAudio = GENERAL_FIELD"addMuteAudio";
const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay"; const string kResetWhenRePlay = GENERAL_FIELD"resetWhenRePlay";
const string kPublishToRtxp = GENERAL_FIELD"publishToRtxp";
const string kPublishToHls = GENERAL_FIELD"publishToHls";
const string kPublishToMP4 = GENERAL_FIELD"publishToMP4";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kFlowThreshold] = 1024; mINI::Instance()[kFlowThreshold] = 1024;
@ -88,6 +91,9 @@ onceToken token([](){
mINI::Instance()[kUltraLowDelay] = 1; mINI::Instance()[kUltraLowDelay] = 1;
mINI::Instance()[kAddMuteAudio] = 1; mINI::Instance()[kAddMuteAudio] = 1;
mINI::Instance()[kResetWhenRePlay] = 1; mINI::Instance()[kResetWhenRePlay] = 1;
mINI::Instance()[kPublishToRtxp] = 1;
mINI::Instance()[kPublishToHls] = 1;
mINI::Instance()[kPublishToMP4] = 0;
},nullptr); },nullptr);
}//namespace General }//namespace General
@ -95,57 +101,44 @@ onceToken token([](){
////////////HTTP配置/////////// ////////////HTTP配置///////////
namespace Http { namespace Http {
#define HTTP_FIELD "http." #define HTTP_FIELD "http."
//http 文件发送缓存大小 //http 文件发送缓存大小
#define HTTP_SEND_BUF_SIZE (64 * 1024)
const string kSendBufSize = HTTP_FIELD"sendBufSize"; const string kSendBufSize = HTTP_FIELD"sendBufSize";
//http 最大请求字节数 //http 最大请求字节数
#define HTTP_MAX_REQ_SIZE (4*1024)
const string kMaxReqSize = HTTP_FIELD"maxReqSize"; const string kMaxReqSize = HTTP_FIELD"maxReqSize";
//http keep-alive秒数 //http keep-alive秒数
#define HTTP_KEEP_ALIVE_SECOND 10
const string kKeepAliveSecond = HTTP_FIELD"keepAliveSecond"; const string kKeepAliveSecond = HTTP_FIELD"keepAliveSecond";
//http keep-alive最大请求数 //http keep-alive最大请求数
#define HTTP_MAX_REQ_CNT 100
const string kMaxReqCount = HTTP_FIELD"maxReqCount"; const string kMaxReqCount = HTTP_FIELD"maxReqCount";
//http 字符编码 //http 字符编码
#if defined(_WIN32)
#define HTTP_CHAR_SET "gb2312"
#else
#define HTTP_CHAR_SET "utf-8"
#endif
const string kCharSet = HTTP_FIELD"charSet"; const string kCharSet = HTTP_FIELD"charSet";
//http 服务器根目录 //http 服务器根目录
#define HTTP_ROOT_PATH "./httpRoot"
const string kRootPath = HTTP_FIELD"rootPath"; const string kRootPath = HTTP_FIELD"rootPath";
//http 404错误提示内容 //http 404错误提示内容
#define HTTP_NOT_FOUND "<html>"\
"<head><title>404 Not Found</title></head>"\
"<body bgcolor=\"white\">"\
"<center><h1>您访问的资源不存在!</h1></center>"\
"<hr><center>"\
SERVER_NAME\
"</center>"\
"</body>"\
"</html>"
const string kNotFound = HTTP_FIELD"notFound"; const string kNotFound = HTTP_FIELD"notFound";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kSendBufSize] = HTTP_SEND_BUF_SIZE; mINI::Instance()[kSendBufSize] = 64 * 1024;
mINI::Instance()[kMaxReqSize] = HTTP_MAX_REQ_SIZE; mINI::Instance()[kMaxReqSize] = 4*1024;
mINI::Instance()[kKeepAliveSecond] = HTTP_KEEP_ALIVE_SECOND; mINI::Instance()[kKeepAliveSecond] = 15;
mINI::Instance()[kMaxReqCount] = HTTP_MAX_REQ_CNT; mINI::Instance()[kMaxReqCount] = 100;
mINI::Instance()[kCharSet] = HTTP_CHAR_SET;
mINI::Instance()[kRootPath] = HTTP_ROOT_PATH; #if defined(_WIN32)
mINI::Instance()[kNotFound] = HTTP_NOT_FOUND; mINI::Instance()[kCharSet] = "gb2312";
#else
mINI::Instance()[kCharSet] ="utf-8";
#endif
mINI::Instance()[kRootPath] = "./httpRoot";
mINI::Instance()[kNotFound] =
"<html>"
"<head><title>404 Not Found</title></head>"
"<body bgcolor=\"white\">"
"<center><h1>您访问的资源不存在!</h1></center>"
"<hr><center>"
SERVER_NAME
"</center>"
"</body>"
"</html>";
},nullptr); },nullptr);
}//namespace Http }//namespace Http
@ -153,12 +146,10 @@ onceToken token([](){
////////////SHELL配置/////////// ////////////SHELL配置///////////
namespace Shell { namespace Shell {
#define SHELL_FIELD "shell." #define SHELL_FIELD "shell."
#define SHELL_MAX_REQ_SIZE 1024
const string kMaxReqSize = SHELL_FIELD"maxReqSize"; const string kMaxReqSize = SHELL_FIELD"maxReqSize";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kMaxReqSize] = SHELL_MAX_REQ_SIZE; mINI::Instance()[kMaxReqSize] = 1024;
},nullptr); },nullptr);
} //namespace Shell } //namespace Shell
@ -179,7 +170,6 @@ onceToken token([](){
mINI::Instance()[kDirectProxy] = 1; mINI::Instance()[kDirectProxy] = 1;
mINI::Instance()[kModifyStamp] = false; mINI::Instance()[kModifyStamp] = false;
},nullptr); },nullptr);
} //namespace Rtsp } //namespace Rtsp
////////////RTMP服务器配置/////////// ////////////RTMP服务器配置///////////
@ -194,40 +184,28 @@ onceToken token([](){
mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kHandshakeSecond] = 15;
mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15;
},nullptr); },nullptr);
} //namespace RTMP } //namespace RTMP
////////////RTP配置/////////// ////////////RTP配置///////////
namespace Rtp { namespace Rtp {
#define RTP_FIELD "rtp." #define RTP_FIELD "rtp."
//RTP打包最大MTU,公网情况下更小 //RTP打包最大MTU,公网情况下更小
#define RTP_VIDOE_MTU_SIZE 1400
const string kVideoMtuSize = RTP_FIELD"videoMtuSize"; const string kVideoMtuSize = RTP_FIELD"videoMtuSize";
#define RTP_Audio_MTU_SIZE 600
const string kAudioMtuSize = RTP_FIELD"audioMtuSize"; const string kAudioMtuSize = RTP_FIELD"audioMtuSize";
//RTP排序缓存最大个数 //RTP排序缓存最大个数
#define RTP_MAX_RTP_COUNT 50
const string kMaxRtpCount = RTP_FIELD"maxRtpCount"; const string kMaxRtpCount = RTP_FIELD"maxRtpCount";
//如果RTP序列正确次数累计达到该数字就启动清空排序缓存 //如果RTP序列正确次数累计达到该数字就启动清空排序缓存
#define RTP_CLEAR_COUNT 10
const string kClearCount = RTP_FIELD"clearCount"; const string kClearCount = RTP_FIELD"clearCount";
//最大RTP时间为13个小时每13小时回环一次 //最大RTP时间为13个小时每13小时回环一次
#define RTP_CYCLE_MS (13*60*60*1000)
const string kCycleMS = RTP_FIELD"cycleMS"; const string kCycleMS = RTP_FIELD"cycleMS";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kVideoMtuSize] = RTP_VIDOE_MTU_SIZE; mINI::Instance()[kVideoMtuSize] = 1400;
mINI::Instance()[kAudioMtuSize] = RTP_Audio_MTU_SIZE; mINI::Instance()[kAudioMtuSize] = 600;
mINI::Instance()[kMaxRtpCount] = RTP_MAX_RTP_COUNT; mINI::Instance()[kMaxRtpCount] = 50;
mINI::Instance()[kClearCount] = RTP_CLEAR_COUNT; mINI::Instance()[kClearCount] = 10;
mINI::Instance()[kCycleMS] = RTP_CYCLE_MS; mINI::Instance()[kCycleMS] = 13*60*60*1000;
},nullptr); },nullptr);
} //namespace Rtsp } //namespace Rtsp
@ -239,88 +217,67 @@ const string kAddrMin = MULTI_FIELD"addrMin";
//组播分配截止地址 //组播分配截止地址
const string kAddrMax = MULTI_FIELD"addrMax"; const string kAddrMax = MULTI_FIELD"addrMax";
//组播TTL //组播TTL
#define MULTI_UDP_TTL 64
const string kUdpTTL = MULTI_FIELD"udpTTL"; const string kUdpTTL = MULTI_FIELD"udpTTL";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kAddrMin] = "239.0.0.0"; mINI::Instance()[kAddrMin] = "239.0.0.0";
mINI::Instance()[kAddrMax] = "239.255.255.255"; mINI::Instance()[kAddrMax] = "239.255.255.255";
mINI::Instance()[kUdpTTL] = MULTI_UDP_TTL; mINI::Instance()[kUdpTTL] = 64;
},nullptr); },nullptr);
} //namespace MultiCast } //namespace MultiCast
////////////录像配置/////////// ////////////录像配置///////////
namespace Record { namespace Record {
#define RECORD_FIELD "record." #define RECORD_FIELD "record."
//查看录像的应用名称 //查看录像的应用名称
#define RECORD_APP_NAME "record"
const string kAppName = RECORD_FIELD"appName"; const string kAppName = RECORD_FIELD"appName";
//每次流化MP4文件的时长,单位毫秒 //每次流化MP4文件的时长,单位毫秒
#define RECORD_SAMPLE_MS 500
const string kSampleMS = RECORD_FIELD"sampleMS"; const string kSampleMS = RECORD_FIELD"sampleMS";
//MP4文件录制大小,默认一个小时 //MP4文件录制大小,默认一个小时
#define RECORD_FILE_SECOND (60*60)
const string kFileSecond = RECORD_FIELD"fileSecond"; const string kFileSecond = RECORD_FIELD"fileSecond";
//录制文件路径 //录制文件路径
#define RECORD_FILE_PATH HTTP_ROOT_PATH
const string kFilePath = RECORD_FIELD"filePath"; const string kFilePath = RECORD_FIELD"filePath";
//mp4文件写缓存大小 //mp4文件写缓存大小
const string kFileBufSize = RECORD_FIELD"fileBufSize"; const string kFileBufSize = RECORD_FIELD"fileBufSize";
//mp4录制完成后是否进行二次关键帧索引写入头部 //mp4录制完成后是否进行二次关键帧索引写入头部
const string kFastStart = RECORD_FIELD"fastStart"; const string kFastStart = RECORD_FIELD"fastStart";
//mp4文件是否重头循环读取 //mp4文件是否重头循环读取
const string kFileRepeat = RECORD_FIELD"fileRepeat"; const string kFileRepeat = RECORD_FIELD"fileRepeat";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kAppName] = RECORD_APP_NAME; mINI::Instance()[kAppName] = "record";
mINI::Instance()[kSampleMS] = RECORD_SAMPLE_MS; mINI::Instance()[kSampleMS] = 500;
mINI::Instance()[kFileSecond] = RECORD_FILE_SECOND; mINI::Instance()[kFileSecond] = 60*60;
mINI::Instance()[kFilePath] = RECORD_FILE_PATH; mINI::Instance()[kFilePath] = "./httpRoot";
mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kFileBufSize] = 64 * 1024;
mINI::Instance()[kFastStart] = false; mINI::Instance()[kFastStart] = false;
mINI::Instance()[kFileRepeat] = false; mINI::Instance()[kFileRepeat] = false;
},nullptr); },nullptr);
} //namespace Record } //namespace Record
////////////HLS相关配置/////////// ////////////HLS相关配置///////////
namespace Hls { namespace Hls {
#define HLS_FIELD "hls." #define HLS_FIELD "hls."
//HLS切片时长,单位秒 //HLS切片时长,单位秒
#define HLS_SEGMENT_DURATION 3
const string kSegmentDuration = HLS_FIELD"segDur"; const string kSegmentDuration = HLS_FIELD"segDur";
//HLS切片个数 //HLS切片个数
#define HLS_SEGMENT_NUM 3
const string kSegmentNum = HLS_FIELD"segNum"; const string kSegmentNum = HLS_FIELD"segNum";
//HLS切片从m3u8文件中移除后继续保留在磁盘上的个数
const string kSegmentRetain = HLS_FIELD"segRetain";
//HLS文件写缓存大小 //HLS文件写缓存大小
#define HLS_FILE_BUF_SIZE (64 * 1024)
const string kFileBufSize = HLS_FIELD"fileBufSize"; const string kFileBufSize = HLS_FIELD"fileBufSize";
//录制文件路径 //录制文件路径
#define HLS_FILE_PATH (HTTP_ROOT_PATH)
const string kFilePath = HLS_FIELD"filePath"; const string kFilePath = HLS_FIELD"filePath";
onceToken token([](){ onceToken token([](){
mINI::Instance()[kSegmentDuration] = HLS_SEGMENT_DURATION; mINI::Instance()[kSegmentDuration] = 2;
mINI::Instance()[kSegmentNum] = HLS_SEGMENT_NUM; mINI::Instance()[kSegmentNum] = 3;
mINI::Instance()[kFileBufSize] = HLS_FILE_BUF_SIZE; mINI::Instance()[kSegmentRetain] = 5;
mINI::Instance()[kFilePath] = HLS_FILE_PATH; mINI::Instance()[kFileBufSize] = 64 * 1024;
mINI::Instance()[kFilePath] = "./httpRoot";
},nullptr); },nullptr);
} //namespace Hls } //namespace Hls
namespace Client { namespace Client {
const string kNetAdapter = "net_adapter"; const string kNetAdapter = "net_adapter";
const string kRtpType = "rtp_type"; const string kRtpType = "rtp_type";
@ -331,7 +288,6 @@ const string kTimeoutMS = "protocol_timeout_ms";
const string kMediaTimeoutMS = "media_timeout_ms"; const string kMediaTimeoutMS = "media_timeout_ms";
const string kBeatIntervalMS = "beat_interval_ms"; const string kBeatIntervalMS = "beat_interval_ms";
const string kMaxAnalysisMS = "max_analysis_ms"; const string kMaxAnalysisMS = "max_analysis_ms";
} }
} // namespace mediakit } // namespace mediakit

View File

@ -182,6 +182,12 @@ extern const string kAddMuteAudio;
//拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始, //拉流代理时如果断流再重连成功是否删除前一次的媒体流数据,如果删除将重新开始,
//如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写) //如果不删除将会接着上一次的数据继续写(录制hls/mp4时会继续在前一个文件后面写)
extern const string kResetWhenRePlay; extern const string kResetWhenRePlay;
//是否默认推流时转换成rtsp或rtmphook接口(on_publish)中可以覆盖该设置
extern const string kPublishToRtxp ;
//是否默认推流时转换成hlshook接口(on_publish)中可以覆盖该设置
extern const string kPublishToHls ;
//是否默认推流时mp4录像hook接口(on_publish)中可以覆盖该设置
extern const string kPublishToMP4 ;
}//namespace General }//namespace General
@ -283,8 +289,10 @@ extern const string kFileRepeat;
namespace Hls { namespace Hls {
//HLS切片时长,单位秒 //HLS切片时长,单位秒
extern const string kSegmentDuration; extern const string kSegmentDuration;
//HLS切片个数如果设置为0则不删除切片而是保存为点播 //m3u8文件中HLS切片个数如果设置为0则不删除切片而是保存为点播
extern const string kSegmentNum; extern const string kSegmentNum;
//HLS切片从m3u8文件中移除后继续保留在磁盘上的个数
extern const string kSegmentRetain;
//HLS文件写缓存大小 //HLS文件写缓存大小
extern const string kFileBufSize; extern const string kFileBufSize;
//录制文件路径 //录制文件路径

257
src/Http/HttpBody.cpp Normal file
View File

@ -0,0 +1,257 @@
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#include "HttpBody.h"
#include "Util/util.h"
#include "Util/uv_errno.h"
#include "Util/logger.h"
#include "HttpClient.h"
#ifndef _WIN32
#include <sys/mman.h>
#endif
#ifndef _WIN32
#define ENABLE_MMAP
#endif
namespace mediakit {
HttpStringBody::HttpStringBody(const string &str){
_str = str;
}
uint64_t HttpStringBody::remainSize() {
return _str.size() - _offset;
}
Buffer::Ptr HttpStringBody::readData(uint32_t size) {
size = MIN(remainSize(),size);
if(!size){
//没有剩余字节了
return nullptr;
}
auto ret = std::make_shared<BufferString>(_str,_offset,size);
_offset += size;
return ret;
}
//////////////////////////////////////////////////////////////////
HttpFileBody::HttpFileBody(const string &filePath){
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if(fp){
fclose(fp);
}
});
if(!fp){
init(fp,0,0);
}else{
init(fp,0,HttpMultiFormBody::fileSize(fp.get()));
}
}
HttpFileBody::HttpFileBody(const std::shared_ptr<FILE> &fp, uint64_t offset, uint64_t max_size) {
init(fp,offset,max_size);
}
void HttpFileBody::init(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size){
_fp = fp;
_max_size = max_size;
#ifdef ENABLE_MMAP
do {
if(!_fp){
//文件不存在
break;
}
int fd = fileno(fp.get());
if (fd < 0) {
WarnL << "fileno failed:" << get_uv_errmsg(false);
break;
}
auto ptr = (char *) mmap(NULL, max_size, PROT_READ, MAP_SHARED, fd, offset);
if (ptr == MAP_FAILED) {
WarnL << "mmap failed:" << get_uv_errmsg(false);
break;
}
_map_addr.reset(ptr,[max_size,fp](char *ptr){
munmap(ptr,max_size);
});
} while (false);
#endif
if(!_map_addr && offset && fp.get()){
//未映射,那么fseek设置偏移量
fseek(fp.get(), offset, SEEK_SET);
}
}
class BufferMmap : public Buffer{
public:
typedef std::shared_ptr<BufferMmap> Ptr;
BufferMmap(const std::shared_ptr<char> &map_addr,uint64_t offset,int size){
_map_addr = map_addr;
_data = map_addr.get() + offset;
_size = size;
};
virtual ~BufferMmap(){};
//返回数据长度
char *data() const override {
return _data;
}
uint32_t size() const override{
return _size;
}
private:
std::shared_ptr<char> _map_addr;
char *_data;
uint32_t _size;
};
uint64_t HttpFileBody::remainSize() {
return _max_size - _offset;
}
Buffer::Ptr HttpFileBody::readData(uint32_t size) {
size = MIN(remainSize(),size);
if(!size){
//没有剩余字节了
return nullptr;
}
if(!_map_addr){
//fread模式
int iRead;
auto ret = _pool.obtain();
ret->setCapacity(size + 1);
do{
iRead = fread(ret->data(), 1, size, _fp.get());
}while(-1 == iRead && UV_EINTR == get_uv_error(false));
if(iRead > 0){
//读到数据了
ret->setSize(iRead);
_offset += iRead;
return std::move(ret);
}
//读取文件异常,文件真实长度小于声明长度
_offset = _max_size;
WarnL << "read file err:" << get_uv_errmsg();
return nullptr;
}
//mmap模式
auto ret = std::make_shared<BufferMmap>(_map_addr,_offset,size);
_offset += size;
return std::move(ret);
}
//////////////////////////////////////////////////////////////////
HttpMultiFormBody::HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary){
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if(fp){
fclose(fp);
}
});
if(!fp){
throw std::invalid_argument(StrPrinter << "open file failed" << filePath << " " << get_uv_errmsg());
}
_fileBody = std::make_shared<HttpFileBody>(fp, 0, fileSize(fp.get()));
auto fileName = filePath;
auto pos = filePath.rfind('/');
if(pos != string::npos){
fileName = filePath.substr(pos + 1);
}
_bodyPrefix = multiFormBodyPrefix(args,boundary,fileName);
_bodySuffix = multiFormBodySuffix(boundary);
_totalSize = _bodyPrefix.size() + _bodySuffix.size() + _fileBody->remainSize();
}
uint64_t HttpMultiFormBody::remainSize() {
return _totalSize - _offset;
}
Buffer::Ptr HttpMultiFormBody::readData(uint32_t size){
if(_bodyPrefix.size()){
auto ret = std::make_shared<BufferString>(_bodyPrefix);
_offset += _bodyPrefix.size();
_bodyPrefix.clear();
return ret;
}
if(_fileBody->remainSize()){
auto ret = _fileBody->readData(size);
if(!ret){
//读取文件出现异常,提前中断
_offset = _totalSize;
}else{
_offset += ret->size();
}
return ret;
}
if(_bodySuffix.size()){
auto ret = std::make_shared<BufferString>(_bodySuffix);
_offset = _totalSize;
_bodySuffix.clear();
return ret;
}
return nullptr;
}
string HttpMultiFormBody::multiFormBodySuffix(const string &boundary){
string MPboundary = string("--") + boundary;
string endMPboundary = MPboundary + "--";
_StrPrinter body;
body << "\r\n" << endMPboundary;
return body;
}
uint64_t HttpMultiFormBody::fileSize(FILE *fp) {
auto current = ftell(fp);
fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */
auto end = ftell(fp); /* 得到文件大小 */
fseek(fp,current,SEEK_SET);
return end - current;
}
string HttpMultiFormBody::multiFormContentType(const string &boundary){
return StrPrinter << "multipart/form-data; boundary=" << boundary;
}
string HttpMultiFormBody::multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName){
string MPboundary = string("--") + boundary;
_StrPrinter body;
for(auto &pr : args){
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n";
body << pr.second << "\r\n";
}
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n";
body << "Content-Type: application/octet-stream\r\n\r\n" ;
return body;
}
}//namespace mediakit

145
src/Http/HttpBody.h Normal file
View File

@ -0,0 +1,145 @@
/*
* MIT License
*
* Copyright (c) 2016-2019 xiongziliang <771730766@qq.com>
*
* This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit).
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef ZLMEDIAKIT_FILEREADER_H
#define ZLMEDIAKIT_FILEREADER_H
#include <stdlib.h>
#include <memory>
#include "Network/Buffer.h"
#include "Util/ResourcePool.h"
#include "Util/logger.h"
using namespace std;
using namespace toolkit;
#ifndef MIN
#define MIN(a,b) ((a) < (b) ? (a) : (b) )
#endif //MIN
namespace mediakit {
/**
* http content部分基类定义
*/
class HttpBody{
public:
typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){}
virtual ~HttpBody(){}
/**
*
*/
virtual uint64_t remainSize() { return 0;};
/**
* size
* @param size
* @return
*/
virtual Buffer::Ptr readData(uint32_t size) { return nullptr;};
};
/**
* string类型的content
*/
class HttpStringBody : public HttpBody{
public:
typedef std::shared_ptr<HttpStringBody> Ptr;
HttpStringBody(const string &str);
virtual ~HttpStringBody(){}
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override ;
private:
mutable string _str;
uint64_t _offset = 0;
};
/**
* content
*/
class HttpFileBody : public HttpBody{
public:
typedef std::shared_ptr<HttpFileBody> Ptr;
/**
*
* @param fp 0
* @param offset
* @param max_size
*/
HttpFileBody(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size);
HttpFileBody(const string &file_path);
~HttpFileBody(){};
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override;
private:
void init(const std::shared_ptr<FILE> &fp,uint64_t offset,uint64_t max_size);
private:
std::shared_ptr<FILE> _fp;
uint64_t _max_size;
uint64_t _offset = 0;
std::shared_ptr<char> _map_addr;
ResourcePool<BufferRaw> _pool;
};
class HttpArgs;
/**
* http MultiForm http content
*/
class HttpMultiFormBody : public HttpBody {
public:
typedef std::shared_ptr<HttpMultiFormBody> Ptr;
/**
*
* @param args http提交参数列表
* @param filePath
* @param boundary boundary字符串
*/
HttpMultiFormBody(const HttpArgs &args,const string &filePath,const string &boundary = "0xKhTmLbOuNdArY");
virtual ~HttpMultiFormBody(){}
uint64_t remainSize() override ;
Buffer::Ptr readData(uint32_t size) override;
public:
static string multiFormBodyPrefix(const HttpArgs &args,const string &boundary,const string &fileName);
static string multiFormBodySuffix(const string &boundary);
static uint64_t fileSize(FILE *fp);
static string multiFormContentType(const string &boundary);
private:
string _bodyPrefix;
string _bodySuffix;
uint64_t _offset = 0;
uint64_t _totalSize;
HttpFileBody::Ptr _fileBody;
};
}//namespace mediakit
#endif //ZLMEDIAKIT_FILEREADER_H

View File

@ -242,8 +242,9 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) {
void HttpClient::onFlush() { void HttpClient::onFlush() {
_aliveTicker.resetTime(); _aliveTicker.resetTime();
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
while (_body && _body->remainSize() && !isSocketBusy()) { while (_body && _body->remainSize() && !isSocketBusy()) {
auto buffer = _body->readData(); auto buffer = _body->readData(sendBufSize);
if (!buffer) { if (!buffer) {
//数据发送结束或读取数据异常 //数据发送结束或读取数据异常
break; break;

View File

@ -39,7 +39,7 @@
#include "HttpCookie.h" #include "HttpCookie.h"
#include "HttpChunkedSplitter.h" #include "HttpChunkedSplitter.h"
#include "strCoding.h" #include "strCoding.h"
#include "HttpBody.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -64,145 +64,6 @@ public:
} }
}; };
class HttpBody{
public:
typedef std::shared_ptr<HttpBody> Ptr;
HttpBody(){}
virtual ~HttpBody(){}
//剩余数据大小
virtual uint64_t remainSize() = 0;
virtual Buffer::Ptr readData() = 0;
};
class HttpStringBody : public HttpBody{
public:
typedef std::shared_ptr<HttpStringBody> Ptr;
HttpStringBody(const string &str){
_str = str;
}
virtual ~HttpStringBody(){}
uint64_t remainSize() override {
return _str.size();
}
Buffer::Ptr readData() override {
auto ret = std::make_shared<BufferString>(_str);
_str.clear();
return ret;
}
private:
mutable string _str;
};
class HttpMultiFormBody : public HttpBody {
public:
typedef std::shared_ptr<HttpMultiFormBody> Ptr;
template<typename MapType>
HttpMultiFormBody(const MapType &args,const string &filePath,const string &boundary,uint32_t sliceSize = 4 * 1024){
_fp = fopen(filePath.data(),"rb");
if(!_fp){
throw std::invalid_argument(StrPrinter << "打开文件失败:" << filePath << " " << get_uv_errmsg());
}
auto fileName = filePath;
auto pos = filePath.rfind('/');
if(pos != string::npos){
fileName = filePath.substr(pos + 1);
}
_bodyPrefix = multiFormBodyPrefix(args,boundary,fileName);
_bodySuffix = multiFormBodySuffix(boundary);
_totalSize = _bodyPrefix.size() + _bodySuffix.size() + fileSize(_fp);
_sliceSize = sliceSize;
}
virtual ~HttpMultiFormBody(){
fclose(_fp);
}
uint64_t remainSize() override {
return _totalSize - _offset;
}
Buffer::Ptr readData() override{
if(_bodyPrefix.size()){
auto ret = std::make_shared<BufferString>(_bodyPrefix);
_offset += _bodyPrefix.size();
_bodyPrefix.clear();
return ret;
}
if(0 == feof(_fp)){
auto ret = std::make_shared<BufferRaw>(_sliceSize);
//读文件
int size;
do{
size = fread(ret->data(),1,_sliceSize,_fp);
}while(-1 == size && UV_EINTR == get_uv_error(false));
if(size == -1){
_offset = _totalSize;
WarnL << "fread failed:" << get_uv_errmsg();
return nullptr;
}
_offset += size;
ret->setSize(size);
return ret;
}
if(_bodySuffix.size()){
auto ret = std::make_shared<BufferString>(_bodySuffix);
_offset = _totalSize;
_bodySuffix.clear();
return ret;
}
return nullptr;
}
public:
template<typename MapType>
static string multiFormBodyPrefix(const MapType &args,const string &boundary,const string &fileName){
string MPboundary = string("--") + boundary;
_StrPrinter body;
for(auto &pr : args){
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << pr.first << "\"\r\n\r\n";
body << pr.second << "\r\n";
}
body << MPboundary << "\r\n";
body << "Content-Disposition: form-data; name=\"" << "file" << "\";filename=\"" << fileName << "\"\r\n";
body << "Content-Type: application/octet-stream\r\n\r\n" ;
return body;
}
static string multiFormBodySuffix(const string &boundary){
string MPboundary = string("--") + boundary;
string endMPboundary = MPboundary + "--";
_StrPrinter body;
body << "\r\n" << endMPboundary;
return body;
}
static uint64_t fileSize(FILE *fp) {
auto current = ftell(fp);
fseek(fp,0L,SEEK_END); /* 定位到文件末尾 */
auto end = ftell(fp); /* 得到文件大小 */
fseek(fp,current,SEEK_SET);
return end - current;
}
static string multiFormContentType(const string &boundary){
return StrPrinter << "multipart/form-data; boundary=" << boundary;
}
private:
FILE *_fp;
string _bodyPrefix;
string _bodySuffix;
uint64_t _offset = 0;
uint64_t _totalSize;
uint32_t _sliceSize;
};
class HttpClient : public TcpClient , public HttpRequestSplitter class HttpClient : public TcpClient , public HttpRequestSplitter
{ {
public: public:

View File

@ -59,40 +59,43 @@ string dateStr() {
strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt)); strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));
return buf; return buf;
} }
static const char*
get_mime_type(const char* name) { const char *HttpSession::get_mime_type(const char *name) {
const char* dot; const char *dot;
dot = strrchr(name, '.'); dot = strrchr(name, '.');
static HttpSession::KeyValue mapType; static HttpSession::KeyValue mapType;
static onceToken token([&]() { static onceToken token([&]() {
mapType.emplace(".html","text/html"); mapType.emplace(".html", "text/html");
mapType.emplace(".htm","text/html"); mapType.emplace(".htm", "text/html");
mapType.emplace(".mp4","video/mp4"); mapType.emplace(".mp4", "video/mp4");
mapType.emplace(".m3u8","application/vnd.apple.mpegurl"); mapType.emplace(".mkv", "video/x-matroska");
mapType.emplace(".jpg","image/jpeg"); mapType.emplace(".rmvb", "application/vnd.rn-realmedia");
mapType.emplace(".jpeg","image/jpeg"); mapType.emplace(".rm", "application/vnd.rn-realmedia");
mapType.emplace(".gif","image/gif"); mapType.emplace(".m3u8", "application/vnd.apple.mpegurl");
mapType.emplace(".png","image/png"); mapType.emplace(".jpg", "image/jpeg");
mapType.emplace(".ico","image/x-icon"); mapType.emplace(".jpeg", "image/jpeg");
mapType.emplace(".css","text/css"); mapType.emplace(".gif", "image/gif");
mapType.emplace(".js","application/javascript"); mapType.emplace(".png", "image/png");
mapType.emplace(".au","audio/basic"); mapType.emplace(".ico", "image/x-icon");
mapType.emplace(".wav","audio/wav"); mapType.emplace(".css", "text/css");
mapType.emplace(".avi","video/x-msvideo"); mapType.emplace(".js", "application/javascript");
mapType.emplace(".mov","video/quicktime"); mapType.emplace(".au", "audio/basic");
mapType.emplace(".qt","video/quicktime"); mapType.emplace(".wav", "audio/wav");
mapType.emplace(".mpeg","video/mpeg"); mapType.emplace(".avi", "video/x-msvideo");
mapType.emplace(".mpe","video/mpeg"); mapType.emplace(".mov", "video/quicktime");
mapType.emplace(".vrml","model/vrml"); mapType.emplace(".qt", "video/quicktime");
mapType.emplace(".wrl","model/vrml"); mapType.emplace(".mpeg", "video/mpeg");
mapType.emplace(".midi","audio/midi"); mapType.emplace(".mpe", "video/mpeg");
mapType.emplace(".mid","audio/midi"); mapType.emplace(".vrml", "model/vrml");
mapType.emplace(".mp3","audio/mpeg"); mapType.emplace(".wrl", "model/vrml");
mapType.emplace(".ogg","application/ogg"); mapType.emplace(".midi", "audio/midi");
mapType.emplace(".pac","application/x-ns-proxy-autoconfig"); mapType.emplace(".mid", "audio/midi");
mapType.emplace(".flv","video/x-flv"); mapType.emplace(".mp3", "audio/mpeg");
mapType.emplace(".ogg", "application/ogg");
mapType.emplace(".pac", "application/x-ns-proxy-autoconfig");
mapType.emplace(".flv", "video/x-flv");
}, nullptr); }, nullptr);
if(!dot){ if (!dot) {
return "text/plain"; return "text/plain";
} }
auto it = mapType.find(dot); auto it = mapType.find(dot);
@ -102,6 +105,92 @@ get_mime_type(const char* name) {
return it->second.data(); return it->second.data();
} }
////////////////////////////////////////////////////////////////////////////////////////////////////
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{
if(_lambad){
_lambad(codeOut,headerOut,body);
}
}
void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{
this->operator()(codeOut,headerOut,std::make_shared<HttpStringBody>(body));
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){
_lambad = lambda;
}
HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){
if(!lambda){
_lambad = nullptr;
return;
}
_lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){
string str;
if(body && body->remainSize()){
str = body->readData(body->remainSize())->toString();
}
lambda(codeOut,headerOut,str);
};
}
void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader,
const StrCaseMap &responseHeader,
const string &filePath) const {
StrCaseMap &httpHeader = const_cast<StrCaseMap&>(responseHeader);
do {
std::shared_ptr<FILE> fp(fopen(filePath.data(), "rb"), [](FILE *fp) {
if (fp) {
fclose(fp);
}
});
if (!fp) {
//打开文件失败
break;
}
auto &strRange = const_cast<StrCaseMap &>(requestHeader)["Range"];
int64_t iRangeStart = 0;
int64_t iRangeEnd = 0 ;
int64_t fileSize = HttpMultiFormBody::fileSize(fp.get());
const char *pcHttpResult = NULL;
if (strRange.size() == 0) {
//全部下载
pcHttpResult = "200 OK";
iRangeEnd = fileSize - 1;
} else {
//分节下载
pcHttpResult = "206 Partial Content";
iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data());
iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data());
if (iRangeEnd == 0) {
iRangeEnd = fileSize - 1;
}
//分节下载返回Content-Range头
httpHeader.emplace("Content-Range", StrPrinter << "bytes " << iRangeStart << "-" << iRangeEnd << "/" << fileSize << endl);
}
//回复文件
HttpBody::Ptr fileBody = std::make_shared<HttpFileBody>(fp, iRangeStart, iRangeEnd - iRangeStart + 1);
(*this)(pcHttpResult, httpHeader, fileBody);
return;
}while(false);
GET_CONFIG(string,notFound,Http::kNotFound);
GET_CONFIG(string,charSet,Http::kCharSet);
auto strContentType = StrPrinter << "text/html; charset=" << charSet << endl;
httpHeader["Content-Type"] = strContentType;
(*this)("404 Not Found", httpHeader, notFound);
}
HttpResponseInvokerImp::operator bool(){
return _lambad.operator bool();
}
////////////////////////////////////////////////////////////////////////////////////////////////////
HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) {
TraceP(this); TraceP(this);
@ -128,7 +217,7 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) {
string cmd = _parser.Method(); string cmd = _parser.Method();
auto it = g_mapCmdIndex.find(cmd); auto it = g_mapCmdIndex.find(cmd);
if (it == g_mapCmdIndex.end()) { if (it == g_mapCmdIndex.end()) {
sendResponse("403 Forbidden", makeHttpHeader(true), ""); sendResponse("403 Forbidden", true);
shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd)); shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd));
return 0; return 0;
} }
@ -169,11 +258,13 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
} }
void HttpSession::onError(const SockException& err) { void HttpSession::onError(const SockException& err) {
if(_ticker.createdTime() < 10 * 1000){ if(_is_flv_stream){
TraceP(this) << err.what(); //flv播放器
}else{ WarnP(this) << "播放器("
WarnP(this) << err.what(); << _mediaInfo._vhost << "/"
} << _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
@ -184,6 +275,15 @@ void HttpSession::onError(const SockException& err) {
true, true,
*this); *this);
} }
return;
}
//http客户端
if(_ticker.createdTime() < 10 * 1000){
TraceP(this) << err.what();
}else{
WarnP(this) << err.what();
}
} }
void HttpSession::onManager() { void HttpSession::onManager() {
@ -202,7 +302,7 @@ bool HttpSession::checkWebSocket(){
} }
auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
KeyValue headerOut = makeHttpHeader(); KeyValue headerOut;
headerOut["Upgrade"] = "websocket"; headerOut["Upgrade"] = "websocket";
headerOut["Connection"] = "Upgrade"; headerOut["Connection"] = "Upgrade";
headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept; headerOut["Sec-WebSocket-Accept"] = Sec_WebSocket_Accept;
@ -212,7 +312,7 @@ bool HttpSession::checkWebSocket(){
auto res_cb = [this,headerOut](){ auto res_cb = [this,headerOut](){
_flv_over_websocket = true; _flv_over_websocket = true;
sendResponse("101 Switching Protocols",headerOut,""); sendResponse("101 Switching Protocols",false,nullptr,headerOut,nullptr,false);
}; };
//判断是否为websocket-flv //判断是否为websocket-flv
@ -223,11 +323,11 @@ bool HttpSession::checkWebSocket(){
//如果checkLiveFlvStream返回false,则代表不是websocket-flv而是普通的websocket连接 //如果checkLiveFlvStream返回false,则代表不是websocket-flv而是普通的websocket连接
if(!onWebSocketConnect(_parser)){ if(!onWebSocketConnect(_parser)){
sendResponse("501 Not Implemented",headerOut,""); sendResponse("501 Not Implemented",true, nullptr, headerOut);
shutdown(SockException(Err_shutdown,"WebSocket server not implemented")); shutdown(SockException(Err_shutdown,"WebSocket server not implemented"));
return true; return true;
} }
sendResponse("101 Switching Protocols",headerOut,""); sendResponse("101 Switching Protocols",false, nullptr,headerOut);
return true; return true;
} }
//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 //http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
@ -274,14 +374,14 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
auto onRes = [this,rtmp_src,cb](const string &err){ auto onRes = [this,rtmp_src,cb](const string &err){
bool authSuccess = err.empty(); bool authSuccess = err.empty();
if(!authSuccess){ if(!authSuccess){
sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); sendResponse("401 Unauthorized", true, nullptr, KeyValue(), std::make_shared<HttpStringBody>(err));
shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err));
return ; return ;
} }
if(!cb) { if(!cb) {
//找到rtmp源发送http头负载后续发送 //找到rtmp源发送http头负载后续发送
sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); sendResponse("200 OK", false, "video/x-flv",KeyValue(),nullptr,false);
}else{ }else{
cb(); cb();
} }
@ -291,6 +391,7 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
try{ try{
start(getPoller(),rtmp_src); start(getPoller(),rtmp_src);
_is_flv_stream = true;
}catch (std::exception &ex){ }catch (std::exception &ex){
//该rtmp源不存在 //该rtmp源不存在
shutdown(SockException(Err_shutdown,"rtmp mediasource released")); shutdown(SockException(Err_shutdown,"rtmp mediasource released"));
@ -485,14 +586,13 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
return; return;
} }
//先看看该http事件是否被拦截
if(emitHttpEvent(false)){ if(emitHttpEvent(false)){
//拦截http api事件
return; return;
} }
//再看看是否为http-flv直播请求
if(checkLiveFlvStream()){ if(checkLiveFlvStream()){
//若是return //拦截http-flv播放器
return; return;
} }
@ -530,150 +630,38 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) {
if(!errMsg.empty()){ if(!errMsg.empty()){
const_cast<string &>(strMeun) = errMsg; const_cast<string &>(strMeun) = errMsg;
} }
auto headerOut = makeHttpHeader(bClose,strMeun.size()); KeyValue headerOut;
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" , headerOut, strMeun); sendResponse(errMsg.empty() ? "200 OK" : "401 Unauthorized" ,bClose, "text/html", headerOut, std::make_shared<HttpStringBody>(strMeun));
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access folder");
}); });
return; return;
} }
}while(0); }while(0);
//访问的是文件
struct stat tFileStat;
if (0 != stat(strFile.data(), &tFileStat)) {
//文件不存在
sendNotFound(bClose);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on file");
}
//文件智能指针,防止退出时未关闭
std::shared_ptr<FILE> pFilePtr(fopen(strFile.data(), "rb"), [](FILE *pFile) {
if(pFile){
fclose(pFile);
}
});
if (!pFilePtr) {
//打开文件失败
sendNotFound(bClose);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after send 404 not found on open file failed");
}
auto parser = _parser; auto parser = _parser;
//判断是否有权限访问该文件 //判断是否有权限访问该文件
canAccessPath(_parser.Url(),false,[this,parser,tFileStat,pFilePtr,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){ canAccessPath(_parser.Url(),false,[this,parser,bClose,strFile](const string &errMsg,const HttpServerCookie::Ptr &cookie){
if(!errMsg.empty()){ if(!errMsg.empty()){
auto headerOut = makeHttpHeader(bClose,errMsg.size()); KeyValue headerOut;
if(cookie){ if(cookie){
headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
sendResponse("401 Unauthorized" , headerOut, errMsg); sendResponse("401 Unauthorized" ,bClose, nullptr, headerOut, std::make_shared<HttpStringBody>(errMsg));
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed"); throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file failed");
} }
//判断是不是分节下载 KeyValue httpHeader;
auto &strRange = parser["Range"];
int64_t iRangeStart = 0, iRangeEnd = 0;
iRangeStart = atoll(FindField(strRange.data(), "bytes=", "-").data());
iRangeEnd = atoll(FindField(strRange.data(), "-", "\r\n").data());
if (iRangeEnd == 0) {
iRangeEnd = tFileStat.st_size - 1;
}
auto httpHeader = makeHttpHeader(bClose, iRangeEnd - iRangeStart + 1, get_mime_type(strFile.data()));
const char *pcHttpResult = NULL;
if (strRange.size() == 0) {
//全部下载
pcHttpResult = "200 OK";
} else {
//分节下载
pcHttpResult = "206 Partial Content";
fseek(pFilePtr.get(), iRangeStart, SEEK_SET);
//分节下载返回Content-Range头
httpHeader.emplace("Content-Range",StrPrinter<<"bytes " << iRangeStart << "-" << iRangeEnd << "/" << tFileStat.st_size<< endl);
}
if(cookie){ if(cookie){
httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>()); httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get<string>());
} }
//先回复HTTP头部分
sendResponse(pcHttpResult,httpHeader,"");
if (iRangeEnd - iRangeStart < 0) { HttpResponseInvoker invoker = [this,bClose,&strFile](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){
//文件是空的! sendResponse(codeOut.data(), bClose, get_mime_type(strFile.data()), headerOut, body);
throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file");
}
//回复Content部分
std::shared_ptr<int64_t> piLeft(new int64_t(iRangeEnd - iRangeStart + 1));
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() {
TimeTicker();
auto strongSelf = weakSelf.lock();
while(*piLeft && strongSelf){
//更新超时计时器
strongSelf->_ticker.resetTime();
//从循环池获取一个内存片
auto sendBuf = strongSelf->obtainBuffer();
sendBuf->setCapacity(sendBufSize);
//本次需要读取文件字节数
int64_t iReq = MIN(sendBufSize,*piLeft);
//读文件
int iRead;
do{
iRead = fread(sendBuf->data(), 1, iReq, pFilePtr.get());
}while(-1 == iRead && UV_EINTR == get_uv_error(false));
//文件剩余字节数
*piLeft -= iRead;
if (iRead < iReq || !*piLeft) {
//文件读完
if(iRead > 0) {
sendBuf->setSize(iRead);
strongSelf->send(sendBuf);
}
if(strongSelf->isSocketBusy()){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
*piLeft = 0;
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
sendBuf->setSize(iRead);
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose && strongSelf) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown,"read file eof"));
}
//不再监听onFlush事件
return false;
}; };
invoker.responseFile(parser.getValues(),httpHeader,strFile);
//文件下载提升发送性能
setSocketFlags();
onFlush();
_sock->setOnFlush(onFlush);
}); });
} }
@ -776,43 +764,137 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet)
return true; return true;
} }
void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) {
_StrPrinter printer;
printer << "HTTP/1.1 " << pcStatus << "\r\n";
for (auto &pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n" << strContent;
auto strSend = printer << endl;
send(strSend);
_ticker.resetTime();
}
HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { void HttpSession::sendResponse(const char *pcStatus,
KeyValue headerOut; bool bClose,
const char *pcContentType,
const HttpSession::KeyValue &header,
const HttpBody::Ptr &body,
bool set_content_len ){
GET_CONFIG(string,charSet,Http::kCharSet); GET_CONFIG(string,charSet,Http::kCharSet);
GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond); GET_CONFIG(uint32_t,keepAliveSec,Http::kKeepAliveSecond);
GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount); GET_CONFIG(uint32_t,reqCnt,Http::kMaxReqCount);
//body默认为空
int64_t size = 0;
if (body && body->remainSize()) {
//有body获取body大小
size = body->remainSize();
if (size >= INT64_MAX) {
//不固定长度的body那么不设置content-length字段
size = -1;
}
}
if(!set_content_len || size == -1){
//如果是不定长度body或者不设置conten-length,
//那么一定是Keep-Alive类型
bClose = false;
}
HttpSession::KeyValue &headerOut = const_cast<HttpSession::KeyValue &>(header);
headerOut.emplace("Date", dateStr()); headerOut.emplace("Date", dateStr());
headerOut.emplace("Server", SERVER_NAME); headerOut.emplace("Server", SERVER_NAME);
headerOut.emplace("Connection", bClose ? "close" : "keep-alive"); headerOut.emplace("Connection", bClose ? "close" : "keep-alive");
if(!bClose){ if(!bClose){
headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl); headerOut.emplace("Keep-Alive",StrPrinter << "timeout=" << keepAliveSec << ", max=" << reqCnt << endl);
} }
if(pcContentType){
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl;
headerOut.emplace("Content-Type",strContentType);
}
if(iContentSize > 0){
headerOut.emplace("Content-Length", StrPrinter<<iContentSize<<endl);
}
if(!_origin.empty()){ if(!_origin.empty()){
//设置跨域
headerOut.emplace("Access-Control-Allow-Origin",_origin); headerOut.emplace("Access-Control-Allow-Origin",_origin);
headerOut.emplace("Access-Control-Allow-Credentials", "true"); headerOut.emplace("Access-Control-Allow-Credentials", "true");
} }
return headerOut;
if(set_content_len && size >= 0){
//文件长度为定值或者,且不是http-flv强制设置Content-Length
headerOut["Content-Length"] = StrPrinter << size << endl;
}
if(size && !pcContentType){
//有body时设置缺省类型
pcContentType = "text/plain";
}
if(size && pcContentType){
//有body时设置文件类型
auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl;
headerOut.emplace("Content-Type",strContentType);
}
//发送http头
_StrPrinter printer;
printer << "HTTP/1.1 " << pcStatus << "\r\n";
for (auto &pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
send(printer << endl);
_ticker.resetTime();
if(!size){
//没有body
if(bClose){
shutdown(SockException(Err_shutdown,"close connection after send http header completed"));
}
return;
}
//发送http body
GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [body,bClose,weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf){
//本对象已经销毁
return false;
}
while(true){
//更新超时计时器
strongSelf->_ticker.resetTime();
//读取文件
auto sendBuf = body->readData(sendBufSize);
if (!sendBuf) {
//文件读完
if(strongSelf->isSocketBusy() && bClose){
//套接字忙,我们等待触发下一次onFlush事件
//待所有数据flush到socket fd再移除onFlush事件监听
//标记文件读写完毕
return true;
}
//文件全部flush到socket fd可以直接关闭socket了
break;
}
//文件还未读完
if(strongSelf->send(sendBuf) == -1) {
//socket已经销毁不再监听onFlush事件
return false;
}
if(strongSelf->isSocketBusy()){
//socket忙那么停止继续写,等待下一次onFlush事件然后再读文件写socket
return true;
}
//socket还可写继续写socket
}
if(bClose) {
//最后一次flush事件文件也发送完毕了可以关闭socket了
strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed"));
}
//不再监听onFlush事件
return false;
};
if(body->remainSize() > sendBufSize){
//文件下载提升发送性能
setSocketFlags();
}
onFlush();
_sock->setOnFlush(onFlush);
} }
string HttpSession::urlDecode(const string &str){ string HttpSession::urlDecode(const string &str){
@ -841,20 +923,25 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt); bool bClose = (strcasecmp(_parser["Connection"].data(),"close") == 0) || ( ++_iReqCnt > reqCnt);
/////////////////////异步回复Invoker/////////////////////////////// /////////////////////异步回复Invoker///////////////////////////////
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this()); weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){ HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
return; return;
} }
strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() { strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if(!strongSelf) { if(!strongSelf) {
//本对象已经销毁
return; return;
} }
strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut);
if(bClose){ if(codeOut.empty()){
strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); //回调提供的参数异常
strongSelf->sendNotFound(bClose);
return;
} }
strongSelf->sendResponse(codeOut.data(), bClose, nullptr, headerOut, body);
}); });
}; };
///////////////////广播HTTP事件/////////////////////////// ///////////////////广播HTTP事件///////////////////////////
@ -862,7 +949,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this);
if(!consumed && doInvoke){ if(!consumed && doInvoke){
//该事件无人消费所以返回404 //该事件无人消费所以返回404
invoker("404 Not Found",KeyValue(),""); invoker("404 Not Found",KeyValue(), HttpBody::Ptr());
if(bClose){ if(bClose){
//close类型回复完毕关闭连接 //close类型回复完毕关闭连接
shutdown(SockException(Err_shutdown,"404 Not Found")); shutdown(SockException(Err_shutdown,"404 Not Found"));
@ -943,25 +1030,10 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) {
} }
//有后续content数据要处理,暂时不关闭连接 //有后续content数据要处理,暂时不关闭连接
} }
void HttpSession::responseDelay(bool bClose,
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut){
if(codeOut.empty()){
sendNotFound(bClose);
return;
}
auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain");
for (auto &pr : headerOther){
//添加默认http头默认http头不能覆盖用户自定义的头
const_cast<KeyValue &>(headerOut).emplace(pr.first,pr.second);
}
sendResponse(codeOut.data(), headerOut, contentOut);
}
void HttpSession::sendNotFound(bool bClose) { void HttpSession::sendNotFound(bool bClose) {
GET_CONFIG(string,notFound,Http::kNotFound); GET_CONFIG(string,notFound,Http::kNotFound);
sendResponse("404 Not Found", makeHttpHeader(bClose, notFound.size()), notFound); sendResponse("404 Not Found", bClose,"text/html",KeyValue(),std::make_shared<HttpStringBody>(notFound));
} }
void HttpSession::setSocketFlags(){ void HttpSession::setSocketFlags(){

View File

@ -36,21 +36,44 @@
#include "HttpRequestSplitter.h" #include "HttpRequestSplitter.h"
#include "WebSocketSplitter.h" #include "WebSocketSplitter.h"
#include "HttpCookieManager.h" #include "HttpCookieManager.h"
#include "HttpBody.h"
#include "Util/function_traits.h"
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
namespace mediakit { namespace mediakit {
/**
*
*/
class HttpResponseInvokerImp{
public:
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body)> HttpResponseInvokerLambda0;
typedef std::function<void(const string &codeOut, const StrCaseMap &headerOut, const string &body)> HttpResponseInvokerLambda1;
HttpResponseInvokerImp(){}
~HttpResponseInvokerImp(){}
template<typename C>
HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits<C>::stl_function_type(c)) {}
HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda);
HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda);
void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const;
void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const;
void responseFile(const StrCaseMap &requestHeader,const StrCaseMap &responseHeader,const string &filePath) const;
operator bool();
private:
HttpResponseInvokerLambda0 _lambad;
};
class HttpSession: public TcpSession, class HttpSession: public TcpSession,
public FlvMuxer, public FlvMuxer,
public HttpRequestSplitter, public HttpRequestSplitter,
public WebSocketSplitter { public WebSocketSplitter {
public: public:
typedef StrCaseMap KeyValue; typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut, typedef HttpResponseInvokerImp HttpResponseInvoker;
const KeyValue &headerOut,
const string &contentOut)> HttpResponseInvoker;
/** /**
* @param errMsg * @param errMsg
@ -67,6 +90,7 @@ public:
virtual void onManager() override; virtual void onManager() override;
static string urlDecode(const string &str); static string urlDecode(const string &str);
static const char* get_mime_type(const char* name);
protected: protected:
//FlvMuxer override //FlvMuxer override
void onWrite(const Buffer::Ptr &data) override ; void onWrite(const Buffer::Ptr &data) override ;
@ -118,13 +142,9 @@ private:
bool emitHttpEvent(bool doInvoke); bool emitHttpEvent(bool doInvoke);
void urlDecode(Parser &parser); void urlDecode(Parser &parser);
void sendNotFound(bool bClose); void sendNotFound(bool bClose);
void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); void sendResponse(const char *pcStatus, bool bClose, const char *pcContentType = nullptr,
KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); const HttpSession::KeyValue &header = HttpSession::KeyValue(),
void responseDelay(bool bClose, const HttpBody::Ptr &body = nullptr,bool set_content_len = true);
const string &codeOut,
const KeyValue &headerOut,
const string &contentOut);
/** /**
* http客户端是否有权限访问文件的逻辑步骤 * http客户端是否有权限访问文件的逻辑步骤
* *
@ -160,6 +180,7 @@ private:
//处理content数据的callback //处理content数据的callback
function<bool (const char *data,uint64_t len) > _contentCallBack; function<bool (const char *data,uint64_t len) > _contentCallBack;
bool _flv_over_websocket = false; bool _flv_over_websocket = false;
bool _is_flv_stream = false;
}; };

View File

@ -93,13 +93,14 @@ void HlsMaker::delOldSegment() {
return; return;
} }
//在hls m3u8索引文件中,我们保存的切片个数跟_seg_number相关设置一致 //在hls m3u8索引文件中,我们保存的切片个数跟_seg_number相关设置一致
if (_file_index >= _seg_number + 2) { if (_file_index > _seg_number) {
_seg_dur_list.pop_front(); _seg_dur_list.pop_front();
} }
//但是实际保存的切片个数比m3u8所述多两个,这样做的目的是防止播放器在切片删除前能下载完毕 GET_CONFIG(uint32_t,segRetain,Hls::kSegmentRetain);
if (_file_index >= _seg_number + 4) { //但是实际保存的切片个数比m3u8所述多若干个,这样做的目的是防止播放器在切片删除前能下载完毕
onDelSegment(_file_index - _seg_number - 4); if (_file_index > _seg_number + segRetain) {
onDelSegment(_file_index - _seg_number - segRetain - 1);
} }
} }

View File

@ -44,13 +44,17 @@ RtmpSession::~RtmpSession() {
} }
void RtmpSession::onError(const SockException& err) { void RtmpSession::onError(const SockException& err) {
WarnP(this) << err.what(); bool isPlayer = !_pPublisherSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(")
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
bool isPlayer = !_pPublisherSrc;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo, _mediaInfo,
_ui64TotalBytes, _ui64TotalBytes,
@ -194,7 +198,10 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
*this); *this);
if(!flag){ if(!flag){
//该事件无人监听,默认鉴权成功 //该事件无人监听,默认鉴权成功
onRes("",true,true,false); GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
onRes("",toRtxp,toHls,toMP4);
} }
} }

View File

@ -122,7 +122,7 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co
_eType = eType; _eType = eType;
auto ip = FindField(strUrl.data(), "://", "/"); auto ip = FindField(strUrl.data(), "://", "/");
if (!ip.size()) { if (ip.empty()) {
ip = split(FindField(strUrl.data(), "://", NULL),"?")[0]; ip = split(FindField(strUrl.data(), "://", NULL),"?")[0];
} }
auto port = atoi(FindField(ip.data(), ":", NULL).data()); auto port = atoi(FindField(ip.data(), ":", NULL).data());
@ -134,6 +134,11 @@ void RtspPlayer::play(bool isSSL,const string &strUrl, const string &strUser, co
ip = FindField(ip.data(), NULL, ":"); ip = FindField(ip.data(), NULL, ":");
} }
if(ip.empty()){
onPlayResult_l(SockException(Err_other,StrPrinter << "illegal rtsp url:" << strUrl));
return;
}
_strUrl = strUrl; _strUrl = strUrl;
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this()); weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
@ -272,6 +277,13 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){
throw std::runtime_error("open rtcp sock failed"); throw std::runtime_error("open rtcp sock failed");
} }
} }
if(rtpSockRef->get_local_port() % 2 != 0){
//如果rtp端口不是偶数那么与rtcp端口互换目的是兼容一些要求严格的服务器
Socket::Ptr tmp = rtpSockRef;
rtpSockRef = rtcpSockRef;
rtcpSockRef = tmp;
}
} }

View File

@ -85,7 +85,13 @@ RtspSession::~RtspSession() {
} }
void RtspSession::onError(const SockException& err) { void RtspSession::onError(const SockException& err) {
WarnP(this) << err.what(); bool isPlayer = !_pushSrc;
WarnP(this) << (isPlayer ? "播放器(" : "推流器(")
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< ")断开:" << err.what();
if (_rtpType == Rtsp::RTP_MULTICAST) { if (_rtpType == Rtsp::RTP_MULTICAST) {
//取消UDP端口监听 //取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
@ -100,7 +106,6 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播 //流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){ if(_ui64TotalBytes > iFlowThreshold * 1024){
bool isPlayer = !_pushSrc;
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
_mediaInfo, _mediaInfo,
_ui64TotalBytes, _ui64TotalBytes,
@ -319,7 +324,10 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this);
if(!flag){ if(!flag){
//该事件无人监听,默认不鉴权 //该事件无人监听,默认不鉴权
onRes("",true,true,false); GET_CONFIG(bool,toRtxp,General::kPublishToRtxp);
GET_CONFIG(bool,toHls,General::kPublishToHls);
GET_CONFIG(bool,toMP4,General::kPublishToMP4);
onRes("",toRtxp,toHls,toMP4);
} }
} }