From e445c7e14c2b18164baf6769bfbef1b4baab3bde Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Fri, 22 Mar 2019 14:37:03 +0800 Subject: [PATCH] =?UTF-8?q?=E9=81=BF=E5=85=8D=E5=86=85=E5=AD=98=E6=8B=B7?= =?UTF-8?q?=E8=B4=9D=EF=BC=8C=E5=A4=A7=E5=B9=85=E6=8F=90=E9=AB=98rtmp?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E7=9A=84=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtmp/Rtmp.h | 11 +++- src/Rtmp/RtmpProtocol.cpp | 125 ++++++++++++++++++++++---------------- src/Rtmp/RtmpProtocol.h | 1 + src/Rtmp/RtmpPusher.cpp | 4 +- src/Rtmp/RtmpSession.cpp | 2 +- 5 files changed, 87 insertions(+), 56 deletions(-) diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 879987be..1c52a3d3 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -31,6 +31,7 @@ #include #include "Util/util.h" #include "Util/logger.h" +#include "Network/Buffer.h" #include "Network/sockutil.h" using namespace toolkit; @@ -127,7 +128,7 @@ public: #pragma pack(pop) #endif // defined(_WIN32) -class RtmpPacket { +class RtmpPacket : public Buffer{ public: typedef std::shared_ptr Ptr; uint8_t typeId; @@ -139,7 +140,13 @@ public: uint32_t streamId; uint32_t chunkId; std::string strBuf; - +public: + char *data() const override{ + return (char*)strBuf.data(); + } + uint32_t size() const override { + return strBuf.size(); + }; public: RtmpPacket() = default; RtmpPacket(const RtmpPacket &that) = default; diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 22f8415b..7d614007 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -188,60 +188,83 @@ void RtmpProtocol::sendRequest(int iCmd, const string& str) { sendRtmp(iCmd, _ui32StreamId, str, 0, CHUNK_SERVER_REQUEST); } +class BufferPartial : public Buffer { +public: + BufferPartial(const Buffer::Ptr &buffer,uint32_t offset,uint32_t size){ + _buffer = buffer; + _data = buffer->data() + offset; + _size = size; + } + + ~BufferPartial(){} + + char *data() const override { + return _data; + } + uint32_t size() const override{ + return _size; + } +private: + Buffer::Ptr _buffer; + char *_data; + uint32_t _size; +}; + void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, - const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { - if (iChunkId < 2 || iChunkId > 63) { - auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; - throw std::runtime_error(strErr); - } - - bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; - RtmpHeader header; - header.flags = (iChunkId & 0x3f) | (0 << 6); - header.typeId = ui8Type; - set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); - set_be24(header.bodySize, strBuf.size()); - set_le32(header.streamId, ui32StreamId); - - //估算rtmp包数据大小 - uint32_t capacity = ((bExtStamp ? 5 : 1) * (1 + (strBuf.size() / _iChunkLenOut))) + strBuf.size() + sizeof(header); - uint32_t totalSize = 0; - BufferRaw::Ptr buffer = obtainBuffer(); - buffer->setCapacity(capacity); - memcpy(buffer->data() + totalSize,(char *) &header, sizeof(header)); - totalSize += sizeof(header); - - char acExtStamp[4]; - if (bExtStamp) { - //扩展时间戳 - set_be32(acExtStamp, ui32TimeStamp); - } - size_t pos = 0; - while (pos < strBuf.size()) { - if (pos) { - uint8_t flags = (iChunkId & 0x3f) | (3 << 6); - memcpy(buffer->data() + totalSize,&flags, 1); - totalSize += 1; - } - if (bExtStamp) { - //扩展时间戳 - memcpy(buffer->data() + totalSize,acExtStamp, 4); - totalSize += 4; - } - size_t chunk = min(_iChunkLenOut, strBuf.size() - pos); - memcpy(buffer->data() + totalSize,strBuf.data() + pos, chunk); - totalSize += chunk; - pos += chunk; - } - buffer->setSize(totalSize); - onSendRawData(buffer); - _ui32ByteSent += totalSize; - if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) { - _ui32LastSent = _ui32ByteSent; - sendAcknowledgement(_ui32ByteSent); - } + const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { + sendRtmp(ui8Type,ui32StreamId,std::make_shared(strBuf),ui32TimeStamp,iChunkId); } +void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, + const Buffer::Ptr &buf, uint32_t ui32TimeStamp, int iChunkId){ + if (iChunkId < 2 || iChunkId > 63) { + auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; + throw std::runtime_error(strErr); + } + + bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; + RtmpHeader header; + header.flags = (iChunkId & 0x3f) | (0 << 6); + header.typeId = ui8Type; + set_be24(header.timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); + set_be24(header.bodySize, buf->size()); + set_le32(header.streamId, ui32StreamId); + + //估算rtmp包数据大小 + uint32_t totalSize = 0; + onSendRawData(obtainBuffer((char *) &header, sizeof(header))); + totalSize += sizeof(header); + + char acExtStamp[4]; + if (bExtStamp) { + //扩展时间戳 + set_be32(acExtStamp, ui32TimeStamp); + } + size_t pos = 0; + while (pos < buf->size()) { + if (pos) { + uint8_t flags = (iChunkId & 0x3f) | (3 << 6); + onSendRawData(obtainBuffer((char *) &flags, 1)); + totalSize += 1; + } + if (bExtStamp) { + //扩展时间戳 + onSendRawData(obtainBuffer(acExtStamp, 4)); + totalSize += 4; + } + size_t chunk = min(_iChunkLenOut, buf->size() - pos); + onSendRawData(std::make_shared(buf,pos,chunk)); + totalSize += chunk; + pos += chunk; + } + _ui32ByteSent += totalSize; + if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) { + _ui32LastSent = _ui32ByteSent; + sendAcknowledgement(_ui32ByteSent); + } +} + + void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) { _strRcvBuf.append(pcRawData, iSize); auto cb = _nextHandle; diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 72099f5b..fa395bc9 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -74,6 +74,7 @@ protected: void sendRequest(int iCmd, const string &str); void sendResponse(int iType, const string &str); void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); + void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, int iChunkID); protected: int _iReqID = 0; uint32_t _ui32StreamId = STREAM_CONTROL; diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 3a0d350d..6eadcdb1 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -200,7 +200,7 @@ inline void RtmpPusher::send_metaData(){ sendRequest(MSG_DATA, enc.data()); src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - sendRtmp(pkt->typeId, _ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId ); + sendRtmp(pkt->typeId, _ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId ); }); _pRtmpReader = src->getRing()->attach(getPoller()); @@ -210,7 +210,7 @@ inline void RtmpPusher::send_metaData(){ if(!strongSelf) { return; } - strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt->strBuf, pkt->timeStamp, pkt->chunkId); + strongSelf->sendRtmp(pkt->typeId, strongSelf->_ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId); }); _pRtmpReader->setDetachCB([weakSelf](){ auto strongSelf = weakSelf.lock(); diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index e99b4853..b2a2c87d 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -547,7 +547,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { CLEAR_ARR(_aui32FirstStamp); modifiedStamp = 0; } - sendRtmp(pkt->typeId, pkt->streamId, pkt->strBuf, modifiedStamp, pkt->chunkId); + sendRtmp(pkt->typeId, pkt->streamId, pkt, modifiedStamp, pkt->chunkId); } void RtmpSession::doDelay(int delaySec, const std::function &fun) {