From 62dfed39f71f5d0cba2f01cca0f2144510649850 Mon Sep 17 00:00:00 2001 From: xiongguangjie Date: Sun, 5 Jun 2022 11:07:42 +0800 Subject: [PATCH] fix timelatency not take effect and avoid buffer is liner avoid cycle --- srt/PacketQueue.cpp | 173 ++++++++++++++++++++++++---------------- srt/PacketQueue.hpp | 39 +++++---- srt/PacketSendQueue.cpp | 77 ++++++++++++++++++ srt/PacketSendQueue.hpp | 29 +++++++ srt/SrtTransport.cpp | 89 ++++++++++++++------- srt/SrtTransport.hpp | 3 +- 6 files changed, 290 insertions(+), 120 deletions(-) create mode 100644 srt/PacketSendQueue.cpp create mode 100644 srt/PacketSendQueue.hpp diff --git a/srt/PacketQueue.cpp b/srt/PacketQueue.cpp index 2ee6db9a..340f94d0 100644 --- a/srt/PacketQueue.cpp +++ b/srt/PacketQueue.cpp @@ -2,31 +2,62 @@ namespace SRT { +#define MAX_SEQ 0x7fffffff +#define MAX_TS 0xffffffff inline uint32_t genExpectedSeq(uint32_t seq){ - return 0x7fffffff&seq; + return MAX_SEQ & seq; +} +inline bool isSeqEdge(uint32_t seq,uint32_t cap){ + if(seq >(MAX_SEQ - cap)){ + return true; + } + return false; +} + +inline bool isTSCycle(uint32_t first,uint32_t second){ + uint32_t diff; + if(first>second){ + diff = first - second; + }else{ + diff = second - first; + } + + if(diff > (MAX_TS>>1)){ + return true; + }else{ + return false; + } } PacketQueue::PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency) : _pkt_expected_seq(init_seq) , _pkt_cap(max_size) , _pkt_lantency(lantency) { - } - -bool PacketQueue::inputPacket(DataPacket::Ptr pkt) { - if (pkt->packet_seq_number < _pkt_expected_seq) { - // TOO later drop this packet - return false; - } - - _pkt_map[pkt->packet_seq_number] = pkt; - - return true; } +void PacketQueue::tryInsertPkt(DataPacket::Ptr pkt){ -std::list PacketQueue::tryGetPacket() { - std::list re; + if (_pkt_expected_seq <= pkt->packet_seq_number) { + auto diff = pkt->packet_seq_number - _pkt_expected_seq; + if(diff >= (MAX_SEQ>>1)){ + TraceL << "drop packet too later for cycle "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + return; + }else{ + _pkt_map.emplace(pkt->packet_seq_number, pkt); + } + } else { + auto diff = _pkt_expected_seq - pkt->packet_seq_number; + if(diff >= (MAX_SEQ>>1)){ + _pkt_map.emplace(pkt->packet_seq_number, pkt); + TraceL<<" cycle packet "<<"expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + }else{ + TraceL << "drop packet too later "<< "expected seq=" << _pkt_expected_seq << " pkt seq=" << pkt->packet_seq_number; + } + } +} +bool PacketQueue::inputPacket(DataPacket::Ptr pkt,std::list& out) { + tryInsertPkt(pkt); auto it = _pkt_map.find(_pkt_expected_seq); while ( it != _pkt_map.end()) { - re.push_back(it->second); + out.push_back(it->second); _pkt_map.erase(it); _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq+1); it = _pkt_map.find(_pkt_expected_seq); @@ -36,64 +67,37 @@ std::list PacketQueue::tryGetPacket() { // 防止回环 it = _pkt_map.find(_pkt_expected_seq); if(it != _pkt_map.end()){ - re.push_back(it->second); + out.push_back(it->second); _pkt_map.erase(it); } _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); } - while (timeLantency() > _pkt_lantency) { + while (timeLantency() > _pkt_lantency) { it = _pkt_map.find(_pkt_expected_seq); if(it != _pkt_map.end()){ - re.push_back(it->second); + out.push_back(it->second); _pkt_map.erase(it); } _pkt_expected_seq = genExpectedSeq(_pkt_expected_seq + 1); } - return re; -} - - -bool PacketQueue::dropForRecv(uint32_t first,uint32_t last){ - if(first >= last){ - return false; - } - - if(_pkt_expected_seq <= last){ - for(uint32_t i =first;i<=last;++i){ - if(_pkt_map.find(i) != _pkt_map.end()){ - _pkt_map.erase(i); - } - } - _pkt_expected_seq =genExpectedSeq(last+1); - return true; - } - - return false; -} - -bool PacketQueue::dropForSend(uint32_t num){ - if(num <= _pkt_expected_seq){ - return false; - } - decltype(_pkt_map.end()) it; - for(uint32_t i =_pkt_expected_seq;i< num;++i){ - it = _pkt_map.find(i); - if(it != _pkt_map.end()){ - _pkt_map.erase(it); - } - } - _pkt_expected_seq =genExpectedSeq(num); return true; } -DataPacket::Ptr PacketQueue::findPacketBySeq(uint32_t seq){ - auto it = _pkt_map.find(seq); - if(it != _pkt_map.end()){ - return it->second; +bool PacketQueue::drop(uint32_t first, uint32_t last,std::list& out){ + uint32_t end = genExpectedSeq(last+1); + decltype(_pkt_map.end()) it; + for(uint32_t i =_pkt_expected_seq;i< end;){ + it = _pkt_map.find(i); + if(it != _pkt_map.end()){ + out.push_back(it->second); + _pkt_map.erase(it); + } + i = genExpectedSeq(i+1); } - return nullptr; + _pkt_expected_seq = end; + return true; } uint32_t PacketQueue::timeLantency() { @@ -111,8 +115,8 @@ uint32_t PacketQueue::timeLantency() { } if(dur > 0x80000000){ - //WarnL<<"cycle dur "< PacketQueue::getLostSeq() { return re; } + uint32_t end = 0; + uint32_t first,last; + + first = _pkt_map.begin()->second->packet_seq_number; + last = _pkt_map.rbegin()->second->packet_seq_number; + if ((last - first) > (MAX_SEQ >> 1)) { + TraceL << " cycle seq first " << first << " last " << last << " size " << _pkt_map.size(); + end = first; + } else { + end = last; + } PacketQueue::LostPair lost; lost.first = 0; lost.second = 0; uint32_t i = _pkt_expected_seq; bool finish = true; - for(i = _pkt_expected_seq;i<=_pkt_map.rbegin()->first;){ + for(i = _pkt_expected_seq;i<=end;){ if(_pkt_map.find(i) == _pkt_map.end()){ if(finish){ finish = false; @@ -144,7 +159,6 @@ std::list PacketQueue::getLostSeq() { lost.second = i+1; } }else{ - if(!finish){ finish = true; re.push_back(lost); @@ -164,21 +178,42 @@ size_t PacketQueue::getExpectedSize() { if(_pkt_map.empty()){ return 0; } - auto size = _pkt_map.rbegin()->first - _pkt_expected_seq+1; - if(size >= _pkt_cap){ - // 回环 - //WarnL<<"cycle size "<first; + uint32_t min = _pkt_map.begin()->first; + if((max-min)>=(MAX_SEQ>>1)){ + TraceL<<"cycle "<<"expected seq "<<_pkt_expected_seq<<" min "< size){ + return _pkt_cap - size; + } + + if(_pkt_cap > _pkt_map.size()){ + return _pkt_cap - _pkt_map.size(); + } + WarnL<<" cap "<<_pkt_cap<<" expected size "<second->packet_seq_number; + printer<<" last:"<<_pkt_map.rbegin()->second->packet_seq_number; + printer<<" latency:"< +#include #include #include -#include -#include #include +#include -#include "Packet.hpp" +namespace SRT { -namespace SRT{ - -class PacketQueue -{ +// for recv +class PacketQueue { public: using Ptr = std::shared_ptr; - using LostPair = std::pair; + using LostPair = std::pair; - PacketQueue(uint32_t max_size,uint32_t init_seq,uint32_t lantency); + PacketQueue(uint32_t max_size, uint32_t init_seq, uint32_t lantency); ~PacketQueue() = default; - bool inputPacket(DataPacket::Ptr pkt); - std::list tryGetPacket(); + bool inputPacket(DataPacket::Ptr pkt,std::list& out); + uint32_t timeLantency(); std::list getLostSeq(); @@ -28,21 +28,20 @@ public: size_t getAvailableBufferSize(); uint32_t getExpectedSeq(); - bool dropForRecv(uint32_t first,uint32_t last); + bool drop(uint32_t first, uint32_t last,std::list& out); - bool dropForSend(uint32_t num); - - DataPacket::Ptr findPacketBySeq(uint32_t seq); + std::string dump(); private: - std::map _pkt_map; + void tryInsertPkt(DataPacket::Ptr pkt); +private: + + std::map _pkt_map; uint32_t _pkt_expected_seq = 0; uint32_t _pkt_cap; uint32_t _pkt_lantency; - - }; -} +} // namespace SRT -#endif //ZLMEDIAKIT_SRT_PACKET_QUEUE_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_PACKET_QUEUE_H \ No newline at end of file diff --git a/srt/PacketSendQueue.cpp b/srt/PacketSendQueue.cpp new file mode 100644 index 00000000..9ea11aaf --- /dev/null +++ b/srt/PacketSendQueue.cpp @@ -0,0 +1,77 @@ +#include "PacketSendQueue.hpp" + +namespace SRT { + +PacketSendQueue::PacketSendQueue(uint32_t max_size, uint32_t lantency) + : _pkt_cap(max_size) + , _pkt_lantency(lantency) {} +bool PacketSendQueue::drop(uint32_t num) { + decltype(_pkt_cache.begin()) it; + for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) { + if ((*it)->packet_seq_number == num) { + break; + } + } + if (it != _pkt_cache.end()) { + _pkt_cache.erase(_pkt_cache.begin(), it); + } + return true; +} +bool PacketSendQueue::inputPacket(DataPacket::Ptr pkt) { + _pkt_cache.push_back(pkt); + while (_pkt_cache.size() > _pkt_cap) { + _pkt_cache.pop_front(); + } + while (timeLantency() > _pkt_lantency) { + _pkt_cache.pop_front(); + } + return true; +} + +std::list PacketSendQueue::findPacketBySeq(uint32_t start, uint32_t end) { + std::list re; + decltype(_pkt_cache.begin()) it; + for (it = _pkt_cache.begin(); it != _pkt_cache.end(); ++it) { + if ((*it)->packet_seq_number == start) { + break; + } + } + + if (start == end) { + if (it != _pkt_cache.end()) { + re.push_back(*it); + } + return re; + } + + for (; it != _pkt_cache.end(); ++it) { + re.push_back(*it); + if ((*it)->packet_seq_number == end) { + break; + } + } + return re; +} + +uint32_t PacketSendQueue::timeLantency() { + if (_pkt_cache.empty()) { + return 0; + } + auto first = _pkt_cache.front()->timestamp; + auto last = _pkt_cache.back()->timestamp; + uint32_t dur; + + if (last > first) { + dur = last - first; + } else { + dur = first - last; + } + if (dur > (0x01 << 31)) { + TraceL << "cycle timeLantency " << dur; + dur = 0xffffffff - dur; + } + + return dur; +} + +} // namespace SRT \ No newline at end of file diff --git a/srt/PacketSendQueue.hpp b/srt/PacketSendQueue.hpp new file mode 100644 index 00000000..86fa86f1 --- /dev/null +++ b/srt/PacketSendQueue.hpp @@ -0,0 +1,29 @@ +#ifndef ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H +#define ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H +#include "Packet.hpp" +#include +#include +#include +#include +#include +#include +namespace SRT { +class PacketSendQueue { +public: + using Ptr = std::shared_ptr; + using LostPair = std::pair; + PacketSendQueue(uint32_t max_size, uint32_t lantency); + ~PacketSendQueue() = default; + bool drop(uint32_t num); + bool inputPacket(DataPacket::Ptr pkt); + std::list findPacketBySeq(uint32_t start,uint32_t end); +private: + uint32_t timeLantency(); +private: + std::list _pkt_cache; + uint32_t _pkt_cap; + uint32_t _pkt_lantency; +}; +} // namespace SRT + +#endif // ZLMEDIAKIT_SRT_PACKET_SEND_QUEUE_H \ No newline at end of file diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index 4b58b07e..c23614b1 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -198,8 +198,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad registerSelf(); sendControlPacket(res, true); TraceL<<" buf size = "<max_flow_window_size<<" init seq ="<<_init_seq_number<<" lantency="<(res->max_flow_window_size,_init_seq_number, delay*1e6); - _send_buf = std::make_shared(res->max_flow_window_size,_init_seq_number, delay*1e6); + _recv_buf = std::make_shared(res->max_flow_window_size,_init_seq_number, delay*1e3); + _send_buf = std::make_shared(res->max_flow_window_size, delay*1e3); _send_packet_seq_number = _init_seq_number; _buf_delay = delay; onHandShakeFinished(_stream_id,addr); @@ -249,7 +249,7 @@ void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *add pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp); pkt->ack_number = ack.ack_number; pkt->storeToData(); - _send_buf->dropForSend(ack.last_ack_pkt_seq_number); + _send_buf->drop(ack.last_ack_pkt_seq_number); sendControlPacket(pkt,true); //TraceL<<"ack number "<findPacketBySeq(i); - if(data){ - data->R = 1; - data->storeToHeader(); - sendPacket(data,true); - empty = false; - } + auto re_list = _send_buf->findPacketBySeq(it.first,it.second-1); + for(auto pkt : re_list){ + pkt->R = 1; + pkt->storeToHeader(); + sendPacket(pkt,flush); + empty = false; } if(empty){ sendMsgDropReq(it.first,it.second-1); @@ -294,8 +296,45 @@ void SrtTransport::handleShutDown(uint8_t *buf, int len, struct sockaddr_storage void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr){ MsgDropReqPacket pkt; pkt.loadFromData(buf,len); + std::list list; //TraceL<<"drop "<dropForRecv(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num); + _recv_buf->drop(pkt.first_pkt_seq_num,pkt.last_pkt_seq_num,list); + if(list.empty()){ + return; + } + + for(auto data : list){ + onSRTData(std::move(data)); + } + + auto nak_interval = (_rtt+_rtt_variance*4)/2; + if(nak_interval >= 20*1000){ + nak_interval = 20*1000; + } + if(_nak_ticker.elapsedTime(_now)>nak_interval){ + auto lost = _recv_buf->getLostSeq(); + if(!lost.empty()){ + sendNAKPacket(lost); + } + _nak_ticker.resetTime(_now); + } + + if(_ack_ticker.elapsedTime(_now)>10*1000){ + _light_ack_pkt_count = 0; + _ack_ticker.resetTime(_now); + // send a ack per 10 ms for receiver + sendACKPacket(); + }else{ + if(_light_ack_pkt_count >= 64){ + // for high bitrate stream send light ack + // TODO + sendLightACKPacket(); + TraceL<<"send light ack"; + } + _light_ack_pkt_count = 0; + } + _light_ack_pkt_count++; + } void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr){ TraceL; @@ -379,32 +418,23 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora pkt->loadFromData(buf,len); pkt->get_ts = _now; - + std::list list; //TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<timestamp<<" size="<payloadSize()<<\ //" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R; -#if 1 - _recv_buf->inputPacket(pkt); -#else - if(pkt->packet_seq_number%100 == 0){ - // drop - TraceL<<"drop packet"; - TraceL<<"expected size "<<_recv_buf->getExpectedSize()<<" real size="<<_recv_buf->getSize(); - }else{ - _recv_buf->inputPacket(pkt); - } -#endif - //TraceL<<" data number size "<tryGetPacket(); - + _recv_buf->inputPacket(pkt,list); for(auto data : list){ onSRTData(std::move(data)); } + if(list.empty()){ + //TraceL<<_recv_buf->dump(); + } + auto nak_interval = (_rtt+_rtt_variance*4)/2; - if(nak_interval >= 20*1000){ + if(nak_interval <= 20*1000){ nak_interval = 20*1000; } - if(_nak_ticker.elapsedTime(_now)>nak_interval || list.empty()){ + if(_nak_ticker.elapsedTime(_now)>nak_interval){ auto lost = _recv_buf->getLostSeq(); if(!lost.empty()){ sendNAKPacket(lost); @@ -428,7 +458,6 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora _light_ack_pkt_count = 0; } _light_ack_pkt_count++; - //bufCheckInterval(); } diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index af19060e..b9aeca5b 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -13,6 +13,7 @@ #include "Common.hpp" #include "Packet.hpp" #include "PacketQueue.hpp" +#include "PacketSendQueue.hpp" #include "Statistic.hpp" namespace SRT { @@ -110,7 +111,7 @@ private: uint32_t _send_packet_seq_number = 0; uint32_t _send_msg_number = 1; - PacketQueue::Ptr _send_buf; + PacketSendQueue::Ptr _send_buf; uint32_t _buf_delay = 120; PacketQueue::Ptr _recv_buf; uint32_t _rtt = 100*1000;