完善平滑发送机制

This commit is contained in:
xiongziliang 2023-12-02 10:13:17 +08:00
parent 0d28edb424
commit 09b2a4d9f5
1 changed files with 15 additions and 14 deletions

View File

@ -41,7 +41,6 @@ public:
FramePacedSender(uint32_t paced_sender_ms, OnFrame cb) { FramePacedSender(uint32_t paced_sender_ms, OnFrame cb) {
_paced_sender_ms = paced_sender_ms; _paced_sender_ms = paced_sender_ms;
_cb = std::move(cb); _cb = std::move(cb);
_stamp[TrackVideo].syncTo(_stamp[TrackAudio]);
} }
void resetTimer(const EventPoller::Ptr &poller) { void resetTimer(const EventPoller::Ptr &poller) {
@ -57,18 +56,17 @@ public:
bool inputFrame(const Frame::Ptr &frame) override { bool inputFrame(const Frame::Ptr &frame) override {
if (!_timer) { if (!_timer) {
setCurrentStamp(frame->dts());
resetTimer(EventPoller::getCurrentPoller()); resetTimer(EventPoller::getCurrentPoller());
setCurrentStamp(0);
} }
int64_t dts; _cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame));
_stamp[frame->getTrackType()].revise(frame->dts(), frame->dts(), dts, dts);
_cache.emplace_back(dts + kMinCacheMS, Frame::getCacheAbleFrame(frame));
return true; return true;
} }
private: private:
void onTick() { void onTick() {
auto dst = _cache.empty() ? 0 : _cache.back().first;
while (!_cache.empty()) { while (!_cache.empty()) {
auto &front = _cache.front(); auto &front = _cache.front();
if (getCurrentStamp() < front.first) { if (getCurrentStamp() < front.first) {
@ -76,37 +74,40 @@ private:
break; break;
} }
// 时间到了该消费frame了 // 时间到了该消费frame了
// TraceL << front.second->getCodecName() << " " << front.first << " " << _ticker.elapsedTime();
_cb(front.second); _cb(front.second);
_cache.pop_front(); _cache.pop_front();
} }
if (_cache.empty() && dst) {
// 消费太快,需要增加缓存大小
setCurrentStamp(dst);
_cache_ms += kMinCacheMS;
}
// 消费太慢需要强制flush数据
if (_cache.size() > 25 * 5) { if (_cache.size() > 25 * 5) {
auto dts = _cache.back().first;
// 强制flush数据
WarnL << "Flush frame paced sender cache: " << _cache.size(); WarnL << "Flush frame paced sender cache: " << _cache.size();
while (!_cache.empty()) { while (!_cache.empty()) {
auto &front = _cache.front(); auto &front = _cache.front();
_cb(front.second); _cb(front.second);
_cache.pop_front(); _cache.pop_front();
} }
setCurrentStamp(dts); setCurrentStamp(dst);
} }
} }
uint64_t getCurrentStamp () { uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; }
return _ticker.elapsedTime() + _stamp_offset;
}
uint64_t setCurrentStamp(uint64_t stamp ) { void setCurrentStamp(uint64_t stamp) {
_stamp_offset = stamp; _stamp_offset = stamp;
_ticker.resetTime(); _ticker.resetTime();
} }
private: private:
uint32_t _paced_sender_ms; uint32_t _paced_sender_ms;
uint32_t _cache_ms = kMinCacheMS;
uint64_t _stamp_offset = 0; uint64_t _stamp_offset = 0;
OnFrame _cb; OnFrame _cb;
Stamp _stamp[2];
Ticker _ticker; Ticker _ticker;
Timer::Ptr _timer; Timer::Ptr _timer;
std::list<std::pair<uint64_t, Frame::Ptr>> _cache; std::list<std::pair<uint64_t, Frame::Ptr>> _cache;