From 4792f6213b627f93126b6214bc37c79c287019ef Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 4 Jan 2022 15:32:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ETs=E6=8B=89=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Http/HttpClient.cpp | 7 +- src/Http/HttpClient.h | 2 +- src/Http/HttpTSPlayer.cpp | 9 +- src/Http/TsPlayer.cpp | 68 +++++++++++++ src/Http/TsPlayer.h | 60 ++++++++++++ src/Http/TsPlayerImp.h | 194 ++++++++++++++++++++++++++++++++++++++ src/Http/TsplayerImp.cpp | 123 ++++++++++++++++++++++++ src/Player/PlayerBase.cpp | 13 ++- 8 files changed, 467 insertions(+), 9 deletions(-) create mode 100644 src/Http/TsPlayer.cpp create mode 100644 src/Http/TsPlayer.h create mode 100644 src/Http/TsPlayerImp.h create mode 100644 src/Http/TsplayerImp.cpp diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index e4829429..d0cc9dc9 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -15,7 +15,8 @@ namespace mediakit { -void HttpClient::sendRequest(const string &url, float timeout_sec) { +void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_timeout_sec) { + _recv_timeout_second = recv_timeout_sec; clearResponse(); _url = url; auto protocol = FindField(url.data(), NULL, "://"); @@ -188,7 +189,7 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) { } if (onRedirectUrl(new_url, _parser.Url() == "302")) { setMethod("GET"); - HttpClient::sendRequest(new_url, _timeout_second); + HttpClient::sendRequest(new_url, _timeout_second, _recv_timeout_second); return 0; } } @@ -278,7 +279,7 @@ void HttpClient::onFlush() { } void HttpClient::onManager() { - if (_recv_timeout_ticker.elapsedTime() > 3 * 1000 && _total_body_size < 0 && !_chunked_splitter) { + if (_recv_timeout_ticker.elapsedTime() > _recv_timeout_second * 1000 && _total_body_size < 0 && !_chunked_splitter) { //如果Content-Length未指定 但接收数据超时 //则认为本次http请求完成 onResponseCompleted_l(); diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 69d7ff46..6fb4967d 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -63,7 +63,7 @@ public: * @param url 请求url * @param timeout_sec 超时时间 */ - virtual void sendRequest(const string &url, float timeout_sec); + virtual void sendRequest(const string &url, float timeout_sec, float recv_timeout_sec = 3); /** * 重置对象 diff --git a/src/Http/HttpTSPlayer.cpp b/src/Http/HttpTSPlayer.cpp index 68e2db1a..62574aa6 100644 --- a/src/Http/HttpTSPlayer.cpp +++ b/src/Http/HttpTSPlayer.cpp @@ -49,7 +49,14 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si } if (_split_ts) { - _segment.input(buf, size); + try { + _segment.input(buf, size); + }catch (std::exception &ex) { + WarnL << ex.what(); + //ts解析失败,清空缓存数据 + _segment.reset(); + throw; + } } else { onPacket(buf, size); } diff --git a/src/Http/TsPlayer.cpp b/src/Http/TsPlayer.cpp new file mode 100644 index 00000000..0ba2e895 --- /dev/null +++ b/src/Http/TsPlayer.cpp @@ -0,0 +1,68 @@ +// +// Created by alex on 2021/4/6. +// + +#include "TsPlayer.h" + +namespace mediakit { + + TsPlayer::TsPlayer(const EventPoller::Ptr &poller):HttpTSPlayer(poller, true) {} + + TsPlayer::~TsPlayer() {} + + void TsPlayer::play(const string &strUrl) { + _ts_url.append(strUrl); + playTs(); + } + + void TsPlayer::teardown_l(const SockException &ex) { + HttpClient::clear(); + shutdown(ex); + } + + void TsPlayer::teardown() { + teardown_l(SockException(Err_shutdown, "teardown")); + } + + void TsPlayer::playTs() { + if (waitResponse()) { + //播放器目前还存活,正在下载中 + return; + } + WarnL << "fetch:" << _ts_url; + _request_complete = false; + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + setMethod("GET"); + sendRequest(_ts_url, 3600 * 2, 60); + } + + void TsPlayer::onResponseCompleted() { + //接收完毕 + teardown_l(SockException(Err_success, StrPrinter << _ts_url << ": play completed")); + } + + void TsPlayer::onDisconnect(const SockException &ex) { + WarnL << _ts_url << " :" << ex.getErrCode() << " " << ex.what(); + if (_first) { + //第一次失败,则播放失败 + _first = false; + onPlayResult(ex); + return; + } + if (ex.getErrCode() == Err_shutdown) { + onShutdown(ex); + }else{ + onResponseCompleted(); + onShutdown(ex); + } + } + + ssize_t TsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { + ssize_t ret = HttpTSPlayer::onResponseHeader(status, header); + if (_first) { + _first = false; + onPlayResult(SockException(Err_success, "play success")); + } + return ret; + } +}//namespace mediakit \ No newline at end of file diff --git a/src/Http/TsPlayer.h b/src/Http/TsPlayer.h new file mode 100644 index 00000000..72e2f007 --- /dev/null +++ b/src/Http/TsPlayer.h @@ -0,0 +1,60 @@ +// +// Created by alex on 2021/4/6. +// + +/* + * Copyright (c) 2020 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef HTTP_TSPLAYER_H +#define HTTP_TSPLAYER_H + +#include +#include "Util/util.h" +#include "Poller/Timer.h" +#include "Http/HttpDownloader.h" +#include "Player/MediaPlayer.h" +#include "Rtp/Decoder.h" +#include "Rtp/TSDecoder.h" +#include "HttpTSPlayer.h" + + + +using namespace toolkit; +namespace mediakit { + + class TsPlayer : public HttpTSPlayer , public PlayerBase { + public: + TsPlayer(const EventPoller::Ptr &poller); + ~TsPlayer() override; + /** + * 开始播放 + * @param strUrl + */ + void play(const string &strUrl) override; + /** + * 停止播放 + */ + void teardown() override; + private: + void playTs(); + void teardown_l(const SockException &ex); + + protected: + virtual void onResponseCompleted() override; + + virtual void onDisconnect(const SockException &ex) override; + + virtual ssize_t onResponseHeader(const string &status, const HttpHeader &header) override; + private: + bool _first = true; + string _ts_url; + }; +}//namespace mediakit +#endif //HTTP_TSPLAYER_H diff --git a/src/Http/TsPlayerImp.h b/src/Http/TsPlayerImp.h new file mode 100644 index 00000000..b548892c --- /dev/null +++ b/src/Http/TsPlayerImp.h @@ -0,0 +1,194 @@ +// +// Created by alex on 2021/7/5. +// + +#ifndef HTTP_TSPLAYERIMP_H +#define HTTP_TSPLAYERIMP_H + +#include +#include "Util/util.h" +#include "Poller/Timer.h" +#include "Http/HttpDownloader.h" +#include "Player/MediaPlayer.h" +#include "Rtp/Decoder.h" +#include "Rtp/TSDecoder.h" +#include "TsPlayer.h" + +using namespace toolkit; +namespace mediakit { + + class TsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this { + public: + TsDemuxer() = default; + + ~TsDemuxer() override { _timer = nullptr; } + + void start(const EventPoller::Ptr &poller, TrackListener *listener); + + bool inputFrame(const Frame::Ptr &frame) override; + + bool addTrack(const Track::Ptr &track) override { + return _delegate.addTrack(track); + } + + void addTrackCompleted() override { + _delegate.addTrackCompleted(); + } + + void resetTracks() override { + ((MediaSink &) _delegate).resetTracks(); + } + + vector getTracks(bool ready = true) const override { + return _delegate.getTracks(ready); + } + + private: + void onTick(); + + int64_t getBufferMS(); + + int64_t getPlayPosition(); + + void setPlayPosition(int64_t pos); + + private: + int64_t _ticker_offset = 0; + Ticker _ticker; + Stamp _stamp[2]; + Timer::Ptr _timer; + MediaSinkDelegate _delegate; + multimap _frame_cache; + }; + +// class TsPlayerImp : public PlayerImp, private TrackListener { +// public: +// typedef std::shared_ptr Ptr; +// TsPlayerImp(const EventPoller::Ptr &poller = nullptr); +// ~TsPlayerImp() override = default; +// +// private: +// //// TsPlayer override//// +// void onPacket(const char *data, size_t len) override{ +// +// if (!_decoder) { +// _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); +// } +// +// if (_decoder && _demuxer) { +// _decoder->input((uint8_t *) data, len); +// } +// +// } +// //// PlayerBase override//// +// void onPlayResult(const SockException &ex) override{ +// WarnL << ex.getErrCode() << " " << ex.what(); +// if (ex) { +// PlayerImp::onPlayResult(ex); +// } else { +// auto demuxer = std::make_shared(); +// demuxer->start(getPoller(), this); +// _demuxer = std::move(demuxer); +// } +// } +// +// bool inputFrame(const Frame::Ptr &frame) override{ +// //计算相对时间戳 +// int64_t dts, pts; +//// _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts); +//// WarnL << getTrackString(frame->getTrackType()) << "当前dts/pts[" << frame->dts() << "/" << frame->pts() << "]["<dts(), Frame::getCacheAbleFrame(frame)); +//// WarnL << "缓存时间[" << getBufferMS() << "]"; +// if (getBufferMS() > 180 * 1000) { +// // 缓存限制最大180秒, 超过180秒强制消费60秒(减少延时或内存占用) +// // 拉取http-ts流时, 部分的上游输出并不是平滑的, 也就是可能一秒输出几mb数据, 然后会间隔几秒甚至十来秒再次输出, +// // 这种情况特别会出现在跨国家地域的拉流情况中, 一部分是上游源输出的问题, 一部分是由于国际出口网络或者isp导致的失速问题. +// // 对于输出http-ts/rtmp/rtsp等流媒体影响并不大, 但是如果输出hls, 那么就会导致m3u8文件中更新segment并不是平滑的, +// // 而是会出现跳段的现象. +// // 所以如果主要是输出hls, 那么最好是不强制消费, 只通过tick周期性平滑消费帧.因为输出是hls, 等于一直在进行消费. +// // 因此不会存在内存占用过大的问题. +// // 所以比较好的解决方法应该是判断是否存在hls输出, 如果存在hls输出则跳过这个限制缓存时间的逻辑.否则限制缓存时间为30秒, +// // 超过缓存时间则强制消费15秒. 但是目前并没有找到一个低成本的判断是否存在hls输出的方法, 所以只有增加缓存时间. +// while (getBufferMS() > 60 * 1000) { +// MediaSink::inputFrame(_frame_cache.begin()->second); +// _frame_cache.erase(_frame_cache.begin()); +// } +// //接着播放缓存中最早的帧 +// setPlayPosition(_frame_cache.begin()->first); +// } +// return true; +// } +// +// void onShutdown(const SockException &ex) override { +// PlayerImp::onShutdown(ex); +// _demuxer = nullptr; +// } +// +// void onTick(){ +// auto it = _frame_cache.begin(); +// while (it != _frame_cache.end()) { +// if (it->first > getPlayPosition()) { +// //这些帧还未到时间播放 +// break; +// } +// if (getBufferMS() < 3 * 1000) { +// //缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕) +// //目的是为了防止定时器长时间干等后,数据瞬间消费完毕 +// setPlayPosition(_frame_cache.begin()->first); +// } +// //消费掉已经到期的帧 +// MediaSink::inputFrame(it->second); +// it = _frame_cache.erase(it); +// } +// } +// //// TrackListener override//// +// bool addTrack(const Track::Ptr &track) override { return true; }; +// +// void addTrackCompleted() override{ +// PlayerImp::onPlayResult(SockException(Err_success, "play hls success")); +// }; +// +// private: +// DecoderImp::Ptr _decoder; +// MediaSinkInterface::Ptr _demuxer; +// }; +// + class TsPlayerImp : public PlayerImp, private TrackListener { + public: + typedef std::shared_ptr Ptr; + + TsPlayerImp(const EventPoller::Ptr &poller = nullptr); + + ~TsPlayerImp() override = default; + + private: + //// HlsPlayer override//// + void onPacket(const char *data, size_t len) override; + + private: + //// PlayerBase override//// + void onPlayResult(const SockException &ex) override; + + vector getTracks(bool ready = true) const override; + + void onShutdown(const SockException &ex) override; + + private: + //// TrackListener override//// + bool addTrack(const Track::Ptr &track) override { return true; }; + + void addTrackCompleted() override; + + private: + DecoderImp::Ptr _decoder; + MediaSinkInterface::Ptr _demuxer; + }; +}//namespace mediakit +#endif //HTTP_TSPLAYERIMP_H diff --git a/src/Http/TsplayerImp.cpp b/src/Http/TsplayerImp.cpp new file mode 100644 index 00000000..8f0d684a --- /dev/null +++ b/src/Http/TsplayerImp.cpp @@ -0,0 +1,123 @@ +#include "TsPlayerImp.h" + +namespace mediakit { + void TsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) { + _frame_cache.clear(); + _stamp[TrackAudio].setRelativeStamp(0); + _stamp[TrackVideo].setRelativeStamp(0); + _stamp[TrackAudio].syncTo(_stamp[TrackVideo]); + setPlayPosition(0); + + _delegate.setTrackListener(listener); + + //每50毫秒执行一次 + weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(0.05f, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + strong_self->onTick(); + return true; + }, poller); + } + + bool TsDemuxer::inputFrame(const Frame::Ptr &frame) { + //为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧 + if (!_delegate.isAllTrackReady()) { + _delegate.inputFrame(frame); + return true; + } + //计算相对时间戳 + int64_t dts, pts; + //根据时间戳缓存frame + _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts); + _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame)); + //根据时间戳缓存frame +// _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame)); + + if (getBufferMS() > 30 * 1000) { + //缓存超过30秒,强制消费至15秒(减少延时或内存占用) + while (getBufferMS() > 15 * 1000) { + _delegate.inputFrame(_frame_cache.begin()->second); + _frame_cache.erase(_frame_cache.begin()); + } + //接着播放缓存中最早的帧 + setPlayPosition(_frame_cache.begin()->first); + } + return true; + } + + int64_t TsDemuxer::getPlayPosition() { + return _ticker.elapsedTime() + _ticker_offset; + } + + int64_t TsDemuxer::getBufferMS() { + if (_frame_cache.empty()) { + return 0; + } + return _frame_cache.rbegin()->first - _frame_cache.begin()->first; + } + + void TsDemuxer::setPlayPosition(int64_t pos) { + _ticker.resetTime(); + _ticker_offset = pos; + } + + void TsDemuxer::onTick() { + auto it = _frame_cache.begin(); + while (it != _frame_cache.end()) { + if (it->first > getPlayPosition()) { + //这些帧还未到时间播放 + break; + } + if (getBufferMS() < 3 * 1000) { + //缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕) + //目的是为了防止定时器长时间干等后,数据瞬间消费完毕 + setPlayPosition(_frame_cache.begin()->first); + } + //消费掉已经到期的帧 + _delegate.inputFrame(it->second); + it = _frame_cache.erase(it); + } + } + +////////////////////////////////////////////////////////////////////////// + + TsPlayerImp::TsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} + + void TsPlayerImp::onPacket(const char *data, size_t len) { + if (!_decoder) { + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); + } + + if (_decoder && _demuxer) { + _decoder->input((uint8_t *) data, len); + } + } + + void TsPlayerImp::addTrackCompleted() { + PlayerImp::onPlayResult(SockException(Err_success, "play hls success")); + } + + void TsPlayerImp::onPlayResult(const SockException &ex) { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex) { + PlayerImp::onPlayResult(ex); + } else { + auto demuxer = std::make_shared(); + demuxer->start(getPoller(), this); + _demuxer = std::move(demuxer); + } + } + + void TsPlayerImp::onShutdown(const SockException &ex) { + PlayerImp::onShutdown(ex); + _demuxer = nullptr; + } + + vector TsPlayerImp::getTracks(bool ready) const { + return static_pointer_cast(_demuxer)->getTracks(ready); + } + +}//namespace mediakit \ No newline at end of file diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index ddab7f3e..ff579217 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -13,7 +13,7 @@ #include "Rtsp/RtspPlayerImp.h" #include "Rtmp/RtmpPlayerImp.h" #include "Http/HlsPlayer.h" - +#include "Http/TsPlayerImp.h" using namespace toolkit; namespace mediakit { @@ -48,9 +48,14 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller, const s if (strcasecmp("rtmp", prefix.data()) == 0) { return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer); } - - if ((strcasecmp("http", prefix.data()) == 0 || strcasecmp("https", prefix.data()) == 0) && end_with(url, ".m3u8")) { - return PlayerBase::Ptr(new HlsPlayerImp(poller), releasePlayer); + if ((strcasecmp("http",prefix.data()) == 0 || strcasecmp("https",prefix.data()) == 0)) { + if (end_with(url, ".m3u8") || end_with(url_in, ".m3u8")) { + return PlayerBase::Ptr(new HlsPlayerImp(poller),releasePlayer); + } + else if (end_with(url, ".ts") || end_with(url_in, ".ts")) { + return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer); + } + return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer); } return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer);