diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 1eeb329f..bdc1fb59 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 1eeb329f1ebbd7439b1147141de7cec3b5e881e8 +Subproject commit bdc1fb594b96c0ec7b56cf2afffa1ff02341cdbb diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 5cd13ed2..226bd2f2 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -237,5 +237,11 @@ void MediaInfo::parse(const string &url){ } } +void MediaSourceEvent::onNoneReader(MediaSource &sender){ + WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); + //没有任何读取器消费该源,表明该源可以关闭了 + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender); +} + } /* namespace mediakit */ \ No newline at end of file diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 436f8128..d3f09b44 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -54,20 +54,17 @@ public: MediaSourceEvent(){}; virtual ~MediaSourceEvent(){}; public: - virtual bool seekTo(uint32_t ui32Stamp){ + virtual bool seekTo(MediaSource &sender,uint32_t ui32Stamp){ //拖动进度条 return false; } - virtual bool close(bool force) { + virtual bool close(MediaSource &sender,bool force) { //通知其停止推流 return false; } - virtual void onNoneReader(MediaSource &sender){ - //没有任何读取器消费该源,表明该源可以关闭了 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender); - } + virtual void onNoneReader(MediaSource &sender); }; class MediaInfo{ @@ -149,7 +146,7 @@ public: if(!listener){ return false; } - return listener->seekTo(ui32Stamp); + return listener->seekTo(*this,ui32Stamp); } virtual uint32_t getTimeStamp(TrackType trackType) = 0; @@ -159,7 +156,7 @@ public: if(!listener){ return false; } - return listener->close(force); + return listener->close(*this,force); } virtual void setListener(const std::weak_ptr &listener){ _listener = listener; diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 10bc187f..5846349f 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -130,7 +130,7 @@ void HttpClient::onConnect(const SockException &ex) { printer << pr.second + "\r\n"; } send(printer << "\r\n"); - onSend(); + onFlush(); } void HttpClient::onRecv(const Buffer::Ptr &pBuf) { @@ -222,12 +222,11 @@ void HttpClient::onRecvContent(const char *data, uint64_t len) { onResponseCompleted_l(); if(biggerThanExpected) { //声明的content数据比真实的小,那么我们只截取前面部分的并断开链接 - shutdown(); - onDisconnect(SockException(Err_other, "http response content size bigger than expected")); + shutdown(SockException(Err_shutdown, "http response content size bigger than expected")); } } -void HttpClient::onSend() { +void HttpClient::onFlush() { _aliveTicker.resetTime(); while (_body && _body->remainSize() && !isSocketBusy()) { auto buffer = _body->readData(); @@ -252,8 +251,7 @@ void HttpClient::onManager() { if (_fTimeOutSec > 0 && _aliveTicker.elapsedTime() > _fTimeOutSec * 1000) { //超时 - onDisconnect(SockException(Err_timeout, "http request timeout")); - shutdown(); + shutdown(SockException(Err_timeout, "http request timeout")); } } diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 14d9640c..bd41acf0 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -299,7 +299,7 @@ protected: virtual void onConnect(const SockException &ex) override; virtual void onRecv(const Buffer::Ptr &pBuf) override; virtual void onErr(const SockException &ex) override; - virtual void onSend() override; + virtual void onFlush() override; virtual void onManager() override; private: void onResponseCompleted_l(); diff --git a/src/Http/HttpDownloader.cpp b/src/Http/HttpDownloader.cpp index 87ebf65d..aac62e0b 100644 --- a/src/Http/HttpDownloader.cpp +++ b/src/Http/HttpDownloader.cpp @@ -66,14 +66,7 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo int64_t HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) { if(status != "200" && status != "206"){ //失败 - shutdown(); - closeFile(); - File::delete_file(_filePath.data()); - if(_onResult){ - auto errMsg = StrPrinter << "Http Status:" << status << endl; - _onResult(Err_other,errMsg,_filePath); - _onResult = nullptr; - } + shutdown(SockException(Err_shutdown,StrPrinter << "Http Status:" << status)); } //后续全部是content return -1; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 29b85dc7..72941d08 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -101,6 +101,7 @@ get_mime_type(const char* name) { HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { + TraceP(this); //设置15秒发送超时时间 pSock->setSendTimeOutSecond(15); //起始接收buffer缓存设置为4K,节省内存 @@ -108,7 +109,7 @@ HttpSession::HttpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { } HttpSession::~HttpSession() { - //DebugL; + TraceP(this); } int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { @@ -124,17 +125,16 @@ int64_t HttpSession::onRecvHeader(const char *header,uint64_t len) { string cmd = _parser.Method(); auto it = g_mapCmdIndex.find(cmd); if (it == g_mapCmdIndex.end()) { - WarnP(this) << cmd; sendResponse("403 Forbidden", makeHttpHeader(true), ""); - shutdown(); - return 0; + shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << cmd)); + return 0; } //默认后面数据不是content而是header int64_t content_len = 0; auto &fun = it->second; if(!(this->*fun)(content_len)){ - shutdown(); + shutdown(SockException(Err_shutdown,"Connection: close")); } //清空解析器节省内存 _parser.Clear(); @@ -156,9 +156,13 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { -// WarnP(this) << err.what(); - GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); + if(_ticker.createdTime() < 10 * 1000){ + TraceP(this) << err.what(); + }else{ + WarnP(this) << err.what(); + } + GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); if(_ui64TotalBytes > iFlowThreshold * 1024){ NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, @@ -174,8 +178,7 @@ void HttpSession::onManager() { if(_ticker.elapsedTime() > keepAliveSec * 1000){ //1分钟超时 -// WarnP(this) <<"HttpSession timeouted!"; - shutdown(); + shutdown(SockException(Err_timeout,"session timeouted")); } } @@ -233,7 +236,7 @@ inline bool HttpSession::checkLiveFlvStream(){ //未找到该流 sendNotFound(bClose); if(bClose){ - shutdown(); + shutdown(SockException(Err_shutdown,"flv stream not found")); } return; } @@ -242,7 +245,7 @@ inline bool HttpSession::checkLiveFlvStream(){ bool authSuccess = err.empty(); if(!authSuccess){ sendResponse("401 Unauthorized", makeHttpHeader(true,err.size()),err); - shutdown(); + shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); return ; } @@ -258,7 +261,7 @@ inline bool HttpSession::checkLiveFlvStream(){ start(getPoller(),rtmp_src); }catch (std::exception &ex){ //该rtmp源不存在 - shutdown(); + shutdown(SockException(Err_shutdown,"rtmp mediasource released")); } }; @@ -446,7 +449,7 @@ inline bool HttpSession::Handle_Req_GET(int64_t &content_len) { strongSelf->send(sendBuf); } if(bClose) { - strongSelf->shutdown(); + strongSelf->shutdown(SockException(Err_shutdown,"read file eof")); } return false; } @@ -645,7 +648,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){ } strongSelf->responseDelay(Origin,bClose,codeOut,headerOut,contentOut); if(bClose){ - strongSelf->shutdown(); + strongSelf->shutdown(SockException(Err_shutdown,"Connection: close")); } }); }; @@ -657,7 +660,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){ invoker("404 Not Found",KeyValue(),""); if(bClose){ //close类型,回复完毕,关闭连接 - shutdown(); + shutdown(SockException(Err_shutdown,"404 Not Found")); } } return consumed; @@ -727,7 +730,7 @@ inline bool HttpSession::Handle_Req_POST(int64_t &content_len) { } //连接类型是close类型,收完content就关闭连接 - shutdown(); + shutdown(SockException(Err_shutdown,"recv http content completed")); //content已经接收完毕 return false ; }; @@ -763,7 +766,7 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer) { } void HttpSession::onDetach() { - shutdown(); + shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); } std::shared_ptr HttpSession::getSharedPtr(){ diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index a3835454..87d69eca 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -83,13 +83,11 @@ protected: uint64_t len, uint64_t totalSize, uint64_t recvedSize){ - WarnL << "content数据长度过大,无法处理,请重载HttpSession::onRecvUnlimitedContent"; - shutdown(); + shutdown(SockException(Err_shutdown,"http post content is too huge,default closed")); } void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ - DebugL << "默认关闭WebSocket"; - shutdown(); + shutdown(SockException(Err_shutdown,"websocket connection default closed")); }; void onRecvWebSocketData(const Parser &header,const char *data,uint64_t len){ diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 59808c41..626d8e2d 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -167,15 +167,16 @@ void MediaReader::startReadMP4() { readSample(sampleMS, false); _mediaMuxer->setListener(strongSelf); } - bool MediaReader::seekTo(uint32_t ui32Stamp){ + bool MediaReader::seekTo(MediaSource &sender,uint32_t ui32Stamp){ seek(ui32Stamp); return true; } -bool MediaReader::close(bool force){ +bool MediaReader::close(MediaSource &sender,bool force){ if(!force && _mediaMuxer->readerCount() != 0 ){ return false; } _timer.reset(); + WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } diff --git a/src/MediaFile/MediaReader.h b/src/MediaFile/MediaReader.h index 7399844d..cb9d398e 100644 --- a/src/MediaFile/MediaReader.h +++ b/src/MediaFile/MediaReader.h @@ -62,13 +62,13 @@ public: * @param ui32Stamp 偏移量,单位毫秒 * @return */ - bool seekTo(uint32_t ui32Stamp) override; + bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; /** * 关闭MediaReader的流化进程,会触发该对象放弃自持有 * @return */ - bool close(bool force) override; + bool close(MediaSource &sender,bool force) override; /** * 自动生成MediaReader对象然后查找相关的MediaSource对象 diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 38e306d4..58438471 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -145,7 +145,7 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ return false; }, getPoller()); } -bool PlayerProxy::close(bool force) { +bool PlayerProxy::close(MediaSource &sender,bool force) { if(!force && _mediaMuxer->readerCount() != 0){ return false; } @@ -162,6 +162,7 @@ bool PlayerProxy::close(bool force) { } } }); + WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 8cad05ed..6c25a157 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -79,7 +79,7 @@ public: * 被主动关闭 * @return */ - bool close(bool force) override; + bool close(MediaSource &sender,bool force) override; private: void onNoneReader(MediaSource &sender) override; void rePlay(const string &strUrl,int iFailedCnt); diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 941dc939..cdb72914 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -70,7 +70,7 @@ void RtmpPlayer::teardown() { CLEAR_ARR(_aiFistStamp); CLEAR_ARR(_aiNowStamp); reset(); - shutdown(); + shutdown(SockException(Err_shutdown,"teardown")); } } void RtmpPlayer::play(const string &strUrl) { diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index abe2ccbe..c4def99f 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -59,7 +59,7 @@ void RtmpPusher::teardown() { } _pPublishTimer.reset(); reset(); - shutdown(); + shutdown(SockException(Err_shutdown,"teardown")); } } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 8b7a7aa9..7c4120aa 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -65,15 +65,13 @@ void RtmpSession::onError(const SockException& err) { void RtmpSession::onManager() { if (_ticker.createdTime() > 15 * 1000) { if (!_pRingReader && !_pPublisherSrc) { - WarnP(this) << "非法链接"; - shutdown(); + shutdown(SockException(Err_timeout,"illegal connection")); } } if (_pPublisherSrc) { //publisher if (_ticker.elapsedTime() > 15 * 1000) { - WarnP(this) << "数据接收超时"; - shutdown(); + shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout")); } } } @@ -84,8 +82,7 @@ void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { _ui64TotalBytes += pBuf->size(); onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { - WarnP(this) << e.what(); - shutdown(); + shutdown(SockException(Err_shutdown,StrPrinter << "catch exception:" << e.what())); } } @@ -159,12 +156,11 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - WarnP(this) << "onPublish:" - << (authSuccess ? "Already publishing:" : err.data()) << " " - << _mediaInfo._vhost << " " - << _mediaInfo._app << " " - << _mediaInfo._streamid << endl; - shutdown(); + string errMsg = StrPrinter << (authSuccess ? "already publishing:" : err.data()) << " " + << _mediaInfo._vhost << " " + << _mediaInfo._app << " " + << _mediaInfo._streamid; + shutdown(SockException(Err_shutdown,errMsg)); return; } _pPublisherSrc.reset(new RtmpToRtspMediaSource(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid)); @@ -222,12 +218,11 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - WarnP(this) << (authSuccess ? "No such stream:" : err.data()) << " " - << _mediaInfo._vhost << " " - << _mediaInfo._app << " " - << _mediaInfo._streamid - << endl; - shutdown(); + string errMsg = StrPrinter << (authSuccess ? "no such stream:" : err.data()) << " " + << _mediaInfo._vhost << " " + << _mediaInfo._app << " " + << _mediaInfo._streamid; + shutdown(SockException(Err_shutdown,errMsg)); return; } @@ -286,7 +281,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr if (!strongSelf) { return; } - strongSelf->shutdown(); + strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); }); _pPlayerSrc = src; if (src->readerCount() == 1) { diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index e04ae727..58490c1c 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -81,12 +81,12 @@ private: sendResponse(MSG_CMD, invoke.data()); } - bool close(bool force) override { + bool close(MediaSource &sender,bool force) override { if(!force && _pPublisherSrc->readerCount() != 0){ return false; } - InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; - safeShutdown(); + string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + safeShutdown(SockException(Err_shutdown,err)); return true; } private: diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 5b2de918..5f6a14a9 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -51,7 +51,7 @@ RtspPlayer::~RtspPlayer(void) { void RtspPlayer::teardown(){ if (alive()) { sendRtspRequest("TEARDOWN" ,_strContentBase); - shutdown(); + shutdown(SockException(Err_shutdown,"teardown")); } _rtspMd5Nonce.clear(); diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 4b96de81..196d8315 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -25,7 +25,7 @@ RtspPusher::~RtspPusher() { void RtspPusher::teardown() { if (alive()) { sendRtspRequest("TEARDOWN" ,_strContentBase); - shutdown(); + shutdown(SockException(Err_shutdown,"teardown")); } reset(); @@ -329,7 +329,7 @@ inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) { int iTrackIndex = getTrackIndexByTrackType(pkt->type); auto &pSock = _apUdpSock[iTrackIndex]; if (!pSock) { - shutdown(); + shutdown(SockException(Err_shutdown,"udp sock not opened yet")); return; } BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 578976e6..6a7a9f2b 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -72,11 +72,11 @@ static recursive_mutex g_mtxGetter; static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; RtspSession::RtspSession(const Socket::Ptr &pSock) : TcpSession(pSock) { - //设置15秒发送超时时间 - pSock->setSendTimeOutSecond(15); - //起始接收buffer缓存设置为4K,节省内存 - pSock->setReadBuffer(std::make_shared(4 * 1024)); - DebugP(this); + DebugP(this); + //设置15秒发送超时时间 + pSock->setSendTimeOutSecond(15); + //起始接收buffer缓存设置为4K,节省内存 + pSock->setReadBuffer(std::make_shared(4 * 1024)); } RtspSession::~RtspSession() { @@ -84,7 +84,7 @@ RtspSession::~RtspSession() { } void RtspSession::onError(const SockException& err) { - TraceP(this) << err.getErrCode() << " " << err.what(); + WarnP(this) << err.what(); if (_rtpType == Rtsp::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); @@ -113,8 +113,7 @@ void RtspSession::onError(const SockException& err) { void RtspSession::onManager() { if (_ticker.createdTime() > 15 * 1000) { if (_strSession.size() == 0) { - WarnP(this) << "非法链接"; - shutdown(); + shutdown(SockException(Err_timeout,"illegal connection")); return; } } @@ -122,9 +121,8 @@ void RtspSession::onManager() { if ((_rtpType == Rtsp::RTP_UDP || _pushSrc ) && _ticker.elapsedTime() > 15 * 1000) { //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 - WarnP(this) << "RTSP会话超时"; - shutdown(); - return; + shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); + return; } } @@ -169,11 +167,10 @@ void RtspSession::onWholeRtspPacket(Parser &parser) { if (it != s_handler_map.end()) { auto &fun = it->second; if(!(this->*fun)(parser)){ - shutdown(); - } + shutdown(SockException(Err_shutdown,"self close")); + } } else{ - shutdown(); - WarnP(this) << "不支持的rtsp命令:" << strCmd; + shutdown(SockException(Err_shutdown,StrPrinter << "403 Forbidden:" << strCmd)); } } @@ -252,7 +249,7 @@ bool RtspSession::handleReq_RECORD(const Parser &parser){ if(!authSuccess){ //第一次play是播放,否则是恢复播放。只对播放鉴权 sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); - shutdown(); + shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); return; } @@ -260,8 +257,8 @@ bool RtspSession::handleReq_RECORD(const Parser &parser){ for(auto &track : _aTrackInfo){ if (track->_inited == false) { //还有track没有setup - shutdown(); - return; + shutdown(SockException(Err_shutdown,"track not setuped")); + return; } rtp_info << "url=" << _strContentBase << "/" << track->_control_surffix << ","; } @@ -312,9 +309,9 @@ bool RtspSession::handleReq_Describe(const Parser &parser) { auto rtsp_src = dynamic_pointer_cast(src); if (!rtsp_src) { //未找到相应的MediaSource - WarnP(strongSelf.get()) << "No such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; + string err = StrPrinter << "no such stream:" << strongSelf->_mediaInfo._vhost << " " << strongSelf->_mediaInfo._app << " " << strongSelf->_mediaInfo._streamid; strongSelf->send_StreamNotFound(); - strongSelf->shutdown(); + strongSelf->shutdown(SockException(Err_shutdown,err)); return; } //找到了响应的rtsp流 @@ -324,7 +321,7 @@ bool RtspSession::handleReq_Describe(const Parser &parser) { if (strongSelf->_aTrackInfo.empty()) { //该流无效 strongSelf->send_StreamNotFound(); - strongSelf->shutdown(); + strongSelf->shutdown(SockException(Err_shutdown,"can not find any availabe track in sdp")); return; } strongSelf->_strSession = makeRandStr(12); @@ -664,7 +661,7 @@ bool RtspSession::handleReq_Setup(const Parser &parser) { if(!strongSelf) { return; } - strongSelf->safeShutdown(); + strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached")); }); } int iSrvPort = _pBrdcaster->getPort(trackRef->_type); @@ -707,14 +704,14 @@ bool RtspSession::handleReq_Play(const Parser &parser) { if(!authSuccess){ //第一次play是播放,否则是恢复播放。只对播放鉴权 sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); - shutdown(); + shutdown(SockException(Err_shutdown,StrPrinter << "401 Unauthorized:" << err)); return; } auto pMediaSrc = _pMediaSrc.lock(); if(!pMediaSrc){ send_StreamNotFound(); - shutdown(); + shutdown(SockException(Err_shutdown,"rtsp stream released")); return; } @@ -740,8 +737,8 @@ bool RtspSession::handleReq_Play(const Parser &parser) { for(auto &track : _aTrackInfo){ if (track->_inited == false) { //还有track没有setup - shutdown(); - return; + shutdown(SockException(Err_shutdown,"track not setuped")); + return; } track->_ssrc = pMediaSrc->getSsrc(track->_type); track->_seq = pMediaSrc->getSeqence(track->_type); @@ -773,8 +770,8 @@ bool RtspSession::handleReq_Play(const Parser &parser) { if(!strongSelf) { return; } - strongSelf->shutdown(); - }); + strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached")); + }); _pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { auto strongSelf = weakSelf.lock(); if(!strongSelf) { @@ -867,8 +864,7 @@ bool RtspSession::handleReq_Post(const Parser &parser) { _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ auto httpGetterStrong = httpGetterWeak.lock(); if(!httpGetterStrong){ - WarnP(this) << "Http Getter已经释放"; - shutdown(); + shutdown(SockException(Err_shutdown,"http getter released")); return; } @@ -1085,12 +1081,12 @@ inline int RtspSession::getTrackIndexByInterleaved(int interleaved){ return -1; } -bool RtspSession::close(bool force) { +bool RtspSession::close(MediaSource &sender,bool force) { if(!force && _pushSrc->readerCount() != 0){ return false; } - InfoP(this) << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; - safeShutdown(); + string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; + safeShutdown(SockException(Err_shutdown,err)); return true; } @@ -1107,7 +1103,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { int iTrackIndex = getTrackIndexByTrackType(pkt->type); auto &pSock = _apRtpSock[iTrackIndex]; if (!pSock) { - shutdown(); + shutdown(SockException(Err_shutdown,"udp sock not opened yet")); return; } BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index afd66ab0..10c592c6 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -105,7 +105,7 @@ protected: //RtpReceiver override void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; //MediaSourceEvent override - bool close(bool force) override ; + bool close(MediaSource &sender,bool force) override ; //TcpSession override int send(const Buffer::Ptr &pkt) override; diff --git a/src/Shell/ShellSession.cpp b/src/Shell/ShellSession.cpp index 500dbad9..7822245c 100644 --- a/src/Shell/ShellSession.cpp +++ b/src/Shell/ShellSession.cpp @@ -39,26 +39,26 @@ static onceToken s_token([]() { }, nullptr); ShellSession::ShellSession(const Socket::Ptr &_sock) : TcpSession(_sock) { + DebugP(this); pleaseInputUser(); } ShellSession::~ShellSession() { + DebugP(this); } void ShellSession::onRecv(const Buffer::Ptr&buf) { //DebugL << hexdump(buf->data(), buf->size()); GET_CONFIG(uint32_t,maxReqSize,Shell::kMaxReqSize); if (_strRecvBuf.size() + buf->size() >= maxReqSize) { - WarnL << "接收缓冲区溢出!"; - shutdown(); + shutdown(SockException(Err_other,"recv buffer overflow")); return; } _beatTicker.resetTime(); _strRecvBuf.append(buf->data(), buf->size()); if (_strRecvBuf.find("\xff\xf4\xff\0xfd\x06") != std::string::npos) { - WarnL << "收到Ctrl+C."; send("\033[0m\r\n Bye bye!\r\n"); - shutdown(); + shutdown(SockException(Err_other,"received Ctrl+C")); return; } size_t index; @@ -67,16 +67,20 @@ void ShellSession::onRecv(const Buffer::Ptr&buf) { line = _strRecvBuf.substr(0, index); _strRecvBuf.erase(0, index + 2); if (!onCommandLine(line)) { - shutdown(); + shutdown(SockException(Err_other,"exit cmd")); return; } } } +void ShellSession::onError(const SockException &err){ + WarnP(this) << err.what(); +} + void ShellSession::onManager() { if (_beatTicker.elapsedTime() > 1000 * 60 * 5) { //5 miniutes for alive - shutdown(); + shutdown(SockException(Err_timeout,"session timeout")); return; } } diff --git a/src/Shell/ShellSession.h b/src/Shell/ShellSession.h index 95be7792..4c3732ca 100644 --- a/src/Shell/ShellSession.h +++ b/src/Shell/ShellSession.h @@ -41,7 +41,7 @@ public: virtual ~ShellSession(); void onRecv(const Buffer::Ptr &) override; - void onError(const SockException &err) override {}; + void onError(const SockException &err) override; void onManager() override; private: