diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 5f492fa7..f0855052 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -15,7 +15,8 @@ namespace mediakit { -void HttpClient::sendRequest(const string &url, float timeout_sec) { +void HttpClient::sendRequest(const string &url, float timeout_sec, float recv_timeout_sec) { + _recv_timeout_second = recv_timeout_sec; clearResponse(); _url = url; auto protocol = FindField(url.data(), NULL, "://"); @@ -189,7 +190,7 @@ ssize_t HttpClient::onRecvHeader(const char *data, size_t len) { } if (onRedirectUrl(new_url, _parser.Url() == "302")) { setMethod("GET"); - HttpClient::sendRequest(new_url, _timeout_second); + HttpClient::sendRequest(new_url, _timeout_second, _recv_timeout_second); return 0; } } @@ -279,7 +280,7 @@ void HttpClient::onFlush() { } void HttpClient::onManager() { - if (_recv_timeout_ticker.elapsedTime() > 3 * 1000 && _total_body_size < 0 && !_chunked_splitter) { + if (_recv_timeout_ticker.elapsedTime() > _recv_timeout_second * 1000 && _total_body_size < 0 && !_chunked_splitter) { //如果Content-Length未指定 但接收数据超时 //则认为本次http请求完成 onResponseCompleted_l(); diff --git a/src/Http/HttpClient.h b/src/Http/HttpClient.h index 69d7ff46..6fb4967d 100644 --- a/src/Http/HttpClient.h +++ b/src/Http/HttpClient.h @@ -63,7 +63,7 @@ public: * @param url 请求url * @param timeout_sec 超时时间 */ - virtual void sendRequest(const string &url, float timeout_sec); + virtual void sendRequest(const string &url, float timeout_sec, float recv_timeout_sec = 3); /** * 重置对象 diff --git a/src/Http/HttpTSPlayer.cpp b/src/Http/HttpTSPlayer.cpp index d0aa4e37..ff42bd23 100644 --- a/src/Http/HttpTSPlayer.cpp +++ b/src/Http/HttpTSPlayer.cpp @@ -49,7 +49,14 @@ void HttpTSPlayer::onResponseBody(const char *buf, size_t size, size_t recved_si } if (_split_ts) { - _segment.input(buf, size); + try { + _segment.input(buf, size); + }catch (std::exception &ex) { + WarnL << ex.what(); + //ts解析失败,清空缓存数据 + _segment.reset(); + throw; + } } else { onPacket(buf, size); } diff --git a/src/Http/TsPlayer.cpp b/src/Http/TsPlayer.cpp new file mode 100644 index 00000000..0ba2e895 --- /dev/null +++ b/src/Http/TsPlayer.cpp @@ -0,0 +1,68 @@ +// +// Created by alex on 2021/4/6. +// + +#include "TsPlayer.h" + +namespace mediakit { + + TsPlayer::TsPlayer(const EventPoller::Ptr &poller):HttpTSPlayer(poller, true) {} + + TsPlayer::~TsPlayer() {} + + void TsPlayer::play(const string &strUrl) { + _ts_url.append(strUrl); + playTs(); + } + + void TsPlayer::teardown_l(const SockException &ex) { + HttpClient::clear(); + shutdown(ex); + } + + void TsPlayer::teardown() { + teardown_l(SockException(Err_shutdown, "teardown")); + } + + void TsPlayer::playTs() { + if (waitResponse()) { + //播放器目前还存活,正在下载中 + return; + } + WarnL << "fetch:" << _ts_url; + _request_complete = false; + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + setMethod("GET"); + sendRequest(_ts_url, 3600 * 2, 60); + } + + void TsPlayer::onResponseCompleted() { + //接收完毕 + teardown_l(SockException(Err_success, StrPrinter << _ts_url << ": play completed")); + } + + void TsPlayer::onDisconnect(const SockException &ex) { + WarnL << _ts_url << " :" << ex.getErrCode() << " " << ex.what(); + if (_first) { + //第一次失败,则播放失败 + _first = false; + onPlayResult(ex); + return; + } + if (ex.getErrCode() == Err_shutdown) { + onShutdown(ex); + }else{ + onResponseCompleted(); + onShutdown(ex); + } + } + + ssize_t TsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHeader &header) { + ssize_t ret = HttpTSPlayer::onResponseHeader(status, header); + if (_first) { + _first = false; + onPlayResult(SockException(Err_success, "play success")); + } + return ret; + } +}//namespace mediakit \ No newline at end of file diff --git a/src/Http/TsPlayer.h b/src/Http/TsPlayer.h new file mode 100644 index 00000000..72e2f007 --- /dev/null +++ b/src/Http/TsPlayer.h @@ -0,0 +1,60 @@ +// +// Created by alex on 2021/4/6. +// + +/* + * Copyright (c) 2020 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef HTTP_TSPLAYER_H +#define HTTP_TSPLAYER_H + +#include +#include "Util/util.h" +#include "Poller/Timer.h" +#include "Http/HttpDownloader.h" +#include "Player/MediaPlayer.h" +#include "Rtp/Decoder.h" +#include "Rtp/TSDecoder.h" +#include "HttpTSPlayer.h" + + + +using namespace toolkit; +namespace mediakit { + + class TsPlayer : public HttpTSPlayer , public PlayerBase { + public: + TsPlayer(const EventPoller::Ptr &poller); + ~TsPlayer() override; + /** + * 开始播放 + * @param strUrl + */ + void play(const string &strUrl) override; + /** + * 停止播放 + */ + void teardown() override; + private: + void playTs(); + void teardown_l(const SockException &ex); + + protected: + virtual void onResponseCompleted() override; + + virtual void onDisconnect(const SockException &ex) override; + + virtual ssize_t onResponseHeader(const string &status, const HttpHeader &header) override; + private: + bool _first = true; + string _ts_url; + }; +}//namespace mediakit +#endif //HTTP_TSPLAYER_H diff --git a/src/Http/TsPlayerImp.h b/src/Http/TsPlayerImp.h new file mode 100644 index 00000000..80c06167 --- /dev/null +++ b/src/Http/TsPlayerImp.h @@ -0,0 +1,96 @@ +// +// Created by alex on 2021/7/5. +// + +#ifndef HTTP_TSPLAYERIMP_H +#define HTTP_TSPLAYERIMP_H + +#include +#include "Util/util.h" +#include "Poller/Timer.h" +#include "Http/HttpDownloader.h" +#include "Player/MediaPlayer.h" +#include "Rtp/Decoder.h" +#include "Rtp/TSDecoder.h" +#include "TsPlayer.h" + +using namespace toolkit; +namespace mediakit { + + class TsDemuxer : public MediaSinkInterface, public TrackSource, public std::enable_shared_from_this { + public: + TsDemuxer() = default; + + ~TsDemuxer() override { _timer = nullptr; } + + void start(const EventPoller::Ptr &poller, TrackListener *listener); + + bool inputFrame(const Frame::Ptr &frame) override; + + bool addTrack(const Track::Ptr &track) override { + return _delegate.addTrack(track); + } + + void addTrackCompleted() override { + _delegate.addTrackCompleted(); + } + + void resetTracks() override { + ((MediaSink &) _delegate).resetTracks(); + } + + vector getTracks(bool ready = true) const override { + return _delegate.getTracks(ready); + } + + private: + void onTick(); + + int64_t getBufferMS(); + + int64_t getPlayPosition(); + + void setPlayPosition(int64_t pos); + + private: + int64_t _ticker_offset = 0; + Ticker _ticker; + Stamp _stamp[2]; + Timer::Ptr _timer; + MediaSinkDelegate _delegate; + multimap _frame_cache; + }; + + + class TsPlayerImp : public PlayerImp, private TrackListener { + public: + typedef std::shared_ptr Ptr; + + TsPlayerImp(const EventPoller::Ptr &poller = nullptr); + + ~TsPlayerImp() override = default; + + private: + //// HlsPlayer override//// + void onPacket(const char *data, size_t len) override; + + private: + //// PlayerBase override//// + void onPlayResult(const SockException &ex) override; + + vector getTracks(bool ready = true) const override; + + void onShutdown(const SockException &ex) override; + + private: + //// TrackListener override//// + bool addTrack(const Track::Ptr &track) override { return true; }; + + void addTrackCompleted() override; + + private: + DecoderImp::Ptr _decoder; + MediaSinkInterface::Ptr _demuxer; + }; +}//namespace mediakit +#endif //HTTP_TSPLAYERIMP_H diff --git a/src/Http/TsplayerImp.cpp b/src/Http/TsplayerImp.cpp new file mode 100644 index 00000000..8f0d684a --- /dev/null +++ b/src/Http/TsplayerImp.cpp @@ -0,0 +1,123 @@ +#include "TsPlayerImp.h" + +namespace mediakit { + void TsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) { + _frame_cache.clear(); + _stamp[TrackAudio].setRelativeStamp(0); + _stamp[TrackVideo].setRelativeStamp(0); + _stamp[TrackAudio].syncTo(_stamp[TrackVideo]); + setPlayPosition(0); + + _delegate.setTrackListener(listener); + + //每50毫秒执行一次 + weak_ptr weak_self = shared_from_this(); + _timer = std::make_shared(0.05f, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + strong_self->onTick(); + return true; + }, poller); + } + + bool TsDemuxer::inputFrame(const Frame::Ptr &frame) { + //为了避免track准备时间过长, 因此在没准备好之前, 直接消费掉所有的帧 + if (!_delegate.isAllTrackReady()) { + _delegate.inputFrame(frame); + return true; + } + //计算相对时间戳 + int64_t dts, pts; + //根据时间戳缓存frame + _stamp[frame->getTrackType()].revise(frame->dts(), frame->pts(), dts, pts); + _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame)); + //根据时间戳缓存frame +// _frame_cache.emplace(dts, Frame::getCacheAbleFrame(frame)); + + if (getBufferMS() > 30 * 1000) { + //缓存超过30秒,强制消费至15秒(减少延时或内存占用) + while (getBufferMS() > 15 * 1000) { + _delegate.inputFrame(_frame_cache.begin()->second); + _frame_cache.erase(_frame_cache.begin()); + } + //接着播放缓存中最早的帧 + setPlayPosition(_frame_cache.begin()->first); + } + return true; + } + + int64_t TsDemuxer::getPlayPosition() { + return _ticker.elapsedTime() + _ticker_offset; + } + + int64_t TsDemuxer::getBufferMS() { + if (_frame_cache.empty()) { + return 0; + } + return _frame_cache.rbegin()->first - _frame_cache.begin()->first; + } + + void TsDemuxer::setPlayPosition(int64_t pos) { + _ticker.resetTime(); + _ticker_offset = pos; + } + + void TsDemuxer::onTick() { + auto it = _frame_cache.begin(); + while (it != _frame_cache.end()) { + if (it->first > getPlayPosition()) { + //这些帧还未到时间播放 + break; + } + if (getBufferMS() < 3 * 1000) { + //缓存小于3秒,那么降低定时器消费速度(让剩余的数据在3秒后消费完毕) + //目的是为了防止定时器长时间干等后,数据瞬间消费完毕 + setPlayPosition(_frame_cache.begin()->first); + } + //消费掉已经到期的帧 + _delegate.inputFrame(it->second); + it = _frame_cache.erase(it); + } + } + +////////////////////////////////////////////////////////////////////////// + + TsPlayerImp::TsPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {} + + void TsPlayerImp::onPacket(const char *data, size_t len) { + if (!_decoder) { + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); + } + + if (_decoder && _demuxer) { + _decoder->input((uint8_t *) data, len); + } + } + + void TsPlayerImp::addTrackCompleted() { + PlayerImp::onPlayResult(SockException(Err_success, "play hls success")); + } + + void TsPlayerImp::onPlayResult(const SockException &ex) { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex) { + PlayerImp::onPlayResult(ex); + } else { + auto demuxer = std::make_shared(); + demuxer->start(getPoller(), this); + _demuxer = std::move(demuxer); + } + } + + void TsPlayerImp::onShutdown(const SockException &ex) { + PlayerImp::onShutdown(ex); + _demuxer = nullptr; + } + + vector TsPlayerImp::getTracks(bool ready) const { + return static_pointer_cast(_demuxer)->getTracks(ready); + } + +}//namespace mediakit \ No newline at end of file diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index ddab7f3e..ff579217 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -13,7 +13,7 @@ #include "Rtsp/RtspPlayerImp.h" #include "Rtmp/RtmpPlayerImp.h" #include "Http/HlsPlayer.h" - +#include "Http/TsPlayerImp.h" using namespace toolkit; namespace mediakit { @@ -48,9 +48,14 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller, const s if (strcasecmp("rtmp", prefix.data()) == 0) { return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer); } - - if ((strcasecmp("http", prefix.data()) == 0 || strcasecmp("https", prefix.data()) == 0) && end_with(url, ".m3u8")) { - return PlayerBase::Ptr(new HlsPlayerImp(poller), releasePlayer); + if ((strcasecmp("http",prefix.data()) == 0 || strcasecmp("https",prefix.data()) == 0)) { + if (end_with(url, ".m3u8") || end_with(url_in, ".m3u8")) { + return PlayerBase::Ptr(new HlsPlayerImp(poller),releasePlayer); + } + else if (end_with(url, ".ts") || end_with(url_in, ".ts")) { + return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer); + } + return PlayerBase::Ptr(new TsPlayerImp(poller),releasePlayer); } return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer);