diff --git a/src/Http/HttpBody.h b/src/Http/HttpBody.h index fd2b4318..bf9e5cfe 100644 --- a/src/Http/HttpBody.h +++ b/src/Http/HttpBody.h @@ -54,14 +54,14 @@ public: /** * 剩余数据大小 */ - virtual uint64_t remainSize() = 0; + virtual uint64_t remainSize() { return 0;}; /** * 读取一定字节数,返回大小可能小于size * @param size 请求大小 * @return 字节对象 */ - virtual Buffer::Ptr readData(uint32_t size) = 0; + virtual Buffer::Ptr readData(uint32_t size) { return nullptr;}; }; /** diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index ce45f379..03e00b33 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -44,7 +44,6 @@ #include "Util/base64.h" #include "Util/SHA1.h" #include "Rtmp/utils.h" -#include "HttpBody.h" using namespace toolkit; namespace mediakit { @@ -103,6 +102,41 @@ get_mime_type(const char* name) { return it->second.data(); } +//////////////////////////////////////////////////////////////////////////////////////////////////// + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{ + if(_lambad){ + _lambad(codeOut,headerOut,body); + } +} + +void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{ + this->operator()(codeOut,headerOut,std::make_shared(body)); +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){ + _lambad = lambda; +} + +HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){ + if(!lambda){ + _lambad = nullptr; + return; + } + _lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){ + string str; + if(body && body->remainSize()){ + str = body->readData(body->remainSize())->toString(); + } + lambda(codeOut,headerOut,str); + }; +} + +HttpResponseInvokerImp::operator bool(){ + return _lambad.operator bool(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { TraceP(this); @@ -293,7 +327,7 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ if(!cb) { //找到rtmp源,发送http头,负载后续发送 - sendResponse("200 OK", makeHttpHeader(false, 0, get_mime_type(".flv")), ""); + sendResponse("200 OK", makeHttpHeader(false, -1, get_mime_type(".flv")), ""); }else{ cb(); } @@ -503,7 +537,6 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { return; } - //事件未被拦截,则认为是http下载请求 auto fullUrl = string(HTTP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl(); _mediaInfo.parse(fullUrl); @@ -609,66 +642,16 @@ void HttpSession::Handle_Req_GET(int64_t &content_len) { if(cookie){ httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookiePathKey].get()); } - //先回复HTTP头部分 - sendResponse(pcHttpResult,httpHeader,""); - + if (iRangeEnd - iRangeStart < 0) { + sendResponse(pcHttpResult,httpHeader,""); //文件是空的! throw SockException(bClose ? Err_shutdown : Err_success,"close connection after access file"); } - //回复Content部分 - GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); + + //先回复HTTP头部分 HttpBody::Ptr fileBody = std::make_shared(pFilePtr,iRangeStart,iRangeEnd - iRangeStart + 1); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - - auto onFlush = [fileBody,bClose,weakSelf]() { - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ - //本对象已经销毁 - return false; - } - while(true){ - //更新超时计时器 - strongSelf->_ticker.resetTime(); - //读取文件 - auto sendBuf = fileBody->readData(sendBufSize); - if (!sendBuf) { - //文件读完 - if(strongSelf->isSocketBusy()){ - //套接字忙,我们等待触发下一次onFlush事件 - //待所有数据flush到socket fd再移除onFlush事件监听 - //标记文件读写完毕 - return true; - } - //文件全部flush到socket fd,可以直接关闭socket了 - break; - } - - //文件还未读完 - if(strongSelf->send(sendBuf) == -1) { - //socket已经销毁,不再监听onFlush事件 - return false; - } - if(strongSelf->isSocketBusy()){ - //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket - return true; - } - //socket还可写,继续写socket - } - - if(bClose) { - //最后一次flush事件,文件也发送完毕了,可以关闭socket了 - strongSelf->shutdown(SockException(Err_shutdown,"read file eof")); - } - //不再监听onFlush事件 - return false; - }; - - //文件下载提升发送性能 - setSocketFlags(); - - onFlush(); - _sock->setOnFlush(onFlush); + sendResponse(pcHttpResult,httpHeader,fileBody,bClose); }); } @@ -772,17 +755,84 @@ bool makeMeun(const string &httpPath,const string &strFullPath, string &strRet) } void HttpSession::sendResponse(const char* pcStatus, const KeyValue& header, const string& strContent) { - _StrPrinter printer; - printer << "HTTP/1.1 " << pcStatus << "\r\n"; - for (auto &pr : header) { - printer << pr.first << ": " << pr.second << "\r\n"; - } - printer << "\r\n" << strContent; - auto strSend = printer << endl; - send(strSend); - _ticker.resetTime(); + sendResponse(pcStatus, header,strContent.empty() ? nullptr : std::make_shared(strContent), false); } +void HttpSession::sendResponse(const char *pcStatus,const KeyValue &header,const HttpBody::Ptr &body,bool bClose){ + //发送http头 + _StrPrinter printer; + printer << "HTTP/1.1 " << pcStatus << "\r\n"; + for (auto &pr : header) { + printer << pr.first << ": " << pr.second << "\r\n"; + } + printer << "\r\n"; + send(printer << endl); + _ticker.resetTime(); + + if(!body || !body->remainSize()){ + //没有body + if(bClose){ + shutdown(SockException(Err_shutdown,"close connection after send http header completed")); + } + return; + } + + //发送http body + GET_CONFIG(uint32_t,sendBufSize,Http::kSendBufSize); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + + auto onFlush = [body,bClose,weakSelf]() { + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + //本对象已经销毁 + return false; + } + while(true){ + //更新超时计时器 + strongSelf->_ticker.resetTime(); + //读取文件 + auto sendBuf = body->readData(sendBufSize); + if (!sendBuf) { + //文件读完 + if(strongSelf->isSocketBusy() && bClose){ + //套接字忙,我们等待触发下一次onFlush事件 + //待所有数据flush到socket fd再移除onFlush事件监听 + //标记文件读写完毕 + return true; + } + //文件全部flush到socket fd,可以直接关闭socket了 + break; + } + + //文件还未读完 + if(strongSelf->send(sendBuf) == -1) { + //socket已经销毁,不再监听onFlush事件 + return false; + } + if(strongSelf->isSocketBusy()){ + //socket忙,那么停止继续写,等待下一次onFlush事件,然后再读文件写socket + return true; + } + //socket还可写,继续写socket + } + + if(bClose) { + //最后一次flush事件,文件也发送完毕了,可以关闭socket了 + strongSelf->shutdown(SockException(Err_shutdown,"close connection after send http body completed")); + } + //不再监听onFlush事件 + return false; + }; + + if(body->remainSize() > sendBufSize){ + //文件下载提升发送性能 + setSocketFlags(); + } + onFlush(); + _sock->setOnFlush(onFlush); +} + + HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentSize,const char* pcContentType) { KeyValue headerOut; GET_CONFIG(string,charSet,Http::kCharSet); @@ -799,7 +849,7 @@ HttpSession::KeyValue HttpSession::makeHttpHeader(bool bClose, int64_t iContentS auto strContentType = StrPrinter << pcContentType << "; charset=" << charSet << endl; headerOut.emplace("Content-Type",strContentType); } - if(iContentSize > 0){ + if(iContentSize >= 0){ headerOut.emplace("Content-Length", StrPrinter< reqCnt); /////////////////////异步回复Invoker/////////////////////////////// weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const string &contentOut){ + HttpResponseInvoker invoker = [weakSelf,bClose](const string &codeOut, const KeyValue &headerOut, const HttpBody::Ptr &body){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; } - strongSelf->async([weakSelf,bClose,codeOut,headerOut,contentOut]() { + strongSelf->async([weakSelf,bClose,codeOut,headerOut,body]() { auto strongSelf = weakSelf.lock(); if(!strongSelf) { + //本对象已经销毁 return; } - strongSelf->responseDelay(bClose,codeOut,headerOut,contentOut); - if(bClose){ - strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); - } + + if(codeOut.empty()){ + //回调提供的参数异常 + strongSelf->sendNotFound(bClose); + return; + } + + //body默认为空 + int64_t size = 0; + if (body && body->remainSize()) { + //有body,获取body大小 + size = body->remainSize(); + if (size >= INT64_MAX) { + //不固定长度的body,那么不设置content-length字段 + size = -1; + } + } + + auto headerOther = strongSelf->makeHttpHeader(bClose, size, "text/plain"); + for (auto &pr : headerOther) { + //添加默认http头,默认http头不能覆盖用户自定义的头 + const_cast(headerOut).emplace(pr.first, pr.second); + } + strongSelf->sendResponse(codeOut.data(), headerOut, body, bClose); }); }; ///////////////////广播HTTP事件/////////////////////////// @@ -857,7 +928,7 @@ bool HttpSession::emitHttpEvent(bool doInvoke){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastHttpRequest,_parser,invoker,consumed,*this); if(!consumed && doInvoke){ //该事件无人消费,所以返回404 - invoker("404 Not Found",KeyValue(),""); + invoker("404 Not Found",KeyValue(), HttpBody::Ptr()); if(bClose){ //close类型,回复完毕,关闭连接 shutdown(SockException(Err_shutdown,"404 Not Found")); @@ -938,21 +1009,6 @@ void HttpSession::Handle_Req_POST(int64_t &content_len) { } //有后续content数据要处理,暂时不关闭连接 } -void HttpSession::responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut){ - if(codeOut.empty()){ - sendNotFound(bClose); - return; - } - auto headerOther = makeHttpHeader(bClose,contentOut.size(),"text/plain"); - for (auto &pr : headerOther){ - //添加默认http头,默认http头不能覆盖用户自定义的头 - const_cast(headerOut).emplace(pr.first,pr.second); - } - sendResponse(codeOut.data(), headerOut, contentOut); -} void HttpSession::sendNotFound(bool bClose) { GET_CONFIG(string,notFound,Http::kNotFound); diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 19445780..e6276840 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -36,21 +36,43 @@ #include "HttpRequestSplitter.h" #include "WebSocketSplitter.h" #include "HttpCookieManager.h" +#include "HttpBody.h" +#include "Util/function_traits.h" using namespace std; using namespace toolkit; namespace mediakit { +/** + * 该类实现与老代码的兼容适配 + */ +class HttpResponseInvokerImp{ +public: + typedef std::function HttpResponseInvokerLambda0; + typedef std::function HttpResponseInvokerLambda1; + + HttpResponseInvokerImp(){} + ~HttpResponseInvokerImp(){} + template + HttpResponseInvokerImp(const C &c):HttpResponseInvokerImp(typename function_traits::stl_function_type(c)) {} + HttpResponseInvokerImp(const HttpResponseInvokerLambda0 &lambda); + HttpResponseInvokerImp(const HttpResponseInvokerLambda1 &lambda); + + void operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const; + void operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const; + operator bool(); +private: + HttpResponseInvokerLambda0 _lambad; +}; + class HttpSession: public TcpSession, public FlvMuxer, public HttpRequestSplitter, public WebSocketSplitter { public: typedef StrCaseMap KeyValue; - typedef std::function HttpResponseInvoker; + typedef HttpResponseInvokerImp HttpResponseInvoker; /** * @param errMsg 如果为空,则代表鉴权通过,否则为错误提示 @@ -119,11 +141,8 @@ private: void urlDecode(Parser &parser); void sendNotFound(bool bClose); void sendResponse(const char *pcStatus,const KeyValue &header,const string &strContent); + void sendResponse(const char *pcStatus,const KeyValue &header,const HttpBody::Ptr &body,bool bClose); KeyValue makeHttpHeader(bool bClose=false,int64_t iContentSize=-1,const char *pcContentType="text/html"); - void responseDelay(bool bClose, - const string &codeOut, - const KeyValue &headerOut, - const string &contentOut); /** * 判断http客户端是否有权限访问文件的逻辑步骤