MediaSource头文件重构, 独立出PacketCache.h

精简Frame和Track的头文件
This commit is contained in:
cqm 2022-11-25 21:00:01 +08:00
parent 09fb5346c5
commit 2889335803
29 changed files with 209 additions and 164 deletions

View File

@ -16,7 +16,7 @@
#include "Util/uv_errno.h"
#include "Transcode.h"
#include "Extension/AAC.h"
#include "Common/config.h"
#define MAX_DELAY_SECOND 3
using namespace std;

View File

@ -14,7 +14,6 @@
#include <memory>
#include <string>
#include <functional>
#include "Util/util.h"
#include "Util/TimeTicker.h"
#include "Common/MultiMediaSourceMuxer.h"

View File

@ -7,15 +7,16 @@
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include <mutex>
#include "Util/util.h"
#include "Util/NoticeCenter.h"
#include "Network/sockutil.h"
#include "Network/Session.h"
#include "MediaSource.h"
#include "Common/config.h"
#include "Common/Parser.h"
#include "Record/MP4Reader.h"
#include "PacketCache.h"
using namespace std;
using namespace toolkit;

View File

@ -11,15 +11,10 @@
#ifndef ZLMEDIAKIT_MEDIASOURCE_H
#define ZLMEDIAKIT_MEDIASOURCE_H
#include <mutex>
#include <string>
#include <atomic>
#include <memory>
#include <functional>
#include <unordered_map>
#include "Common/config.h"
#include "Common/Parser.h"
#include "Util/List.h"
#include "Network/Socket.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
@ -411,84 +406,5 @@ private:
toolkit::ObjectStatistic<MediaSource> _statistic;
};
/// 缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size);
private:
// 音视频的最后时间戳
uint64_t _last_stamp[2] = { 0, 0 };
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache() { _cache = std::make_shared<packet_list>(); }
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
bool flag = flushImmediatelyWhenCloseMerge();
if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flush();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
if (flag) {
flush();
}
}
void flush() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
bool flushImmediatelyWhenCloseMerge() {
// 一般的协议关闭合并写时立即刷新缓存这样可以减少一帧的延时但是rtp例外
// 因为rtp的包很小一个RtpPacket包中也不是完整的一帧图像所以在关闭合并写时
// 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时
// 但是却对性能提升很大,这样做还是比较划算的
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency);
if (std::is_same<packet, RtpPacket>::value && rtspLowLatency) {
return true;
}
return std::is_same<packet, RtpPacket>::value ? false : (mergeWriteMS <= 0);
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
} /* namespace mediakit */
#endif //ZLMEDIAKIT_MEDIASOURCE_H

87
src/Common/PacketCache.h Normal file
View File

@ -0,0 +1,87 @@
#ifndef _SRC_PACKET_CACHE_H_
#define _SRC_PACKET_CACHE_H_
#include "Common/config.h"
#include "Util/List.h"
#pragma once
namespace mediakit {
/// 缓存刷新策略类
class FlushPolicy {
public:
FlushPolicy() = default;
~FlushPolicy() = default;
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size);
private:
// 音视频的最后时间戳
uint64_t _last_stamp[2] = { 0, 0 };
};
/// 合并写缓存模板
/// \tparam packet 包类型
/// \tparam policy 刷新缓存策略
/// \tparam packet_list 包缓存类型
template<typename packet, typename policy = FlushPolicy, typename packet_list = toolkit::List<std::shared_ptr<packet> > >
class PacketCache {
public:
PacketCache() { _cache = std::make_shared<packet_list>(); }
virtual ~PacketCache() = default;
void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr<packet> pkt, bool key_pos) {
bool flag = flushImmediatelyWhenCloseMerge();
if (!flag && _policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) {
flush();
}
//追加数据到最后
_cache->emplace_back(std::move(pkt));
if (key_pos) {
_key_pos = key_pos;
}
if (flag) {
flush();
}
}
void flush() {
if (_cache->empty()) {
return;
}
onFlush(std::move(_cache), _key_pos);
_cache = std::make_shared<packet_list>();
_key_pos = false;
}
virtual void clearCache() {
_cache->clear();
}
virtual void onFlush(std::shared_ptr<packet_list>, bool key_pos) = 0;
private:
bool flushImmediatelyWhenCloseMerge() {
// 一般的协议关闭合并写时立即刷新缓存这样可以减少一帧的延时但是rtp例外
// 因为rtp的包很小一个RtpPacket包中也不是完整的一帧图像所以在关闭合并写时
// 还是有必要缓冲一帧的rtp(也就是时间戳相同的rtp)再输出,这样虽然会增加一帧的延时
// 但是却对性能提升很大,这样做还是比较划算的
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
GET_CONFIG(int, rtspLowLatency, Rtsp::kLowLatency);
if (std::is_same<packet, RtpPacket>::value && rtspLowLatency) {
return true;
}
return std::is_same<packet, RtpPacket>::value ? false : (mergeWriteMS <= 0);
}
private:
bool _key_pos = false;
policy _policy;
std::shared_ptr<packet_list> _cache;
};
}
#endif

View File

@ -23,6 +23,7 @@
#include "G711.h"
#include "L16.h"
#include "Common/Parser.h"
#include "Common/config.h"
using namespace std;

View File

@ -12,6 +12,8 @@
#include "H264.h"
#include "H265.h"
#include "Common/Parser.h"
#include "Common/Stamp.h"
using namespace std;
using namespace toolkit;
@ -29,6 +31,13 @@ Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){
return std::make_shared<FrameCacheAble>(frame);
}
FrameStamp::FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp)
{
_frame = std::move(frame);
//覆盖时间戳
stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp);
}
TrackType getTrackType(CodecId codecId) {
switch (codecId) {
#define XX(name, type, value, str, mpeg_id) case name : return type;

View File

@ -11,14 +11,14 @@
#ifndef ZLMEDIAKIT_FRAME_H
#define ZLMEDIAKIT_FRAME_H
#include <map>
#include <mutex>
#include <functional>
#include "Util/RingBuffer.h"
#include "Network/Socket.h"
#include "Common/Stamp.h"
#include "Util/List.h"
#include "Network/Buffer.h"
namespace mediakit {
class Stamp;
typedef enum {
TrackInvalid = -1,
TrackVideo = 0,
@ -441,11 +441,7 @@ private:
class FrameStamp : public Frame {
public:
using Ptr = std::shared_ptr<FrameStamp>;
FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp) {
_frame = std::move(frame);
//覆盖时间戳
stamp.revise(_frame->dts(), _frame->pts(), _dts, _pts, modify_stamp);
}
FrameStamp(Frame::Ptr frame, Stamp &stamp, bool modify_stamp);
~FrameStamp() override {}
uint64_t dts() const override { return (uint64_t)_dts; }

View File

@ -9,6 +9,7 @@
*/
#include "H264Rtp.h"
#include "Common/config.h"
namespace mediakit{

View File

@ -14,7 +14,6 @@
#include <memory>
#include <string>
#include "Frame.h"
#include "Util/RingBuffer.h"
#include "Rtsp/Rtsp.h"
namespace mediakit{

View File

@ -12,6 +12,8 @@
#define ZLMEDIAKIT_FMP4MEDIASOURCE_H
#include "Common/MediaSource.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#define FMP4_GOP_SIZE 512

View File

@ -9,6 +9,7 @@
*/
#include "HlsMediaSource.h"
#include "Common/config.h"
using namespace toolkit;
@ -64,4 +65,40 @@ HlsMediaSource::Ptr HlsCookieData::getMediaSource() const {
return _src.lock();
}
void HlsMediaSource::setIndexFile(std::string index_file)
{
if (!_ring) {
std::weak_ptr<HlsMediaSource> weakSelf = std::dynamic_pointer_cast<HlsMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
_ring = std::make_shared<RingType>(0, std::move(lam));
regist();
}
//赋值m3u8索引文件内容
std::lock_guard<std::mutex> lck(_mtx_index);
_index_file = std::move(index_file);
if (!_index_file.empty()) {
_list_cb.for_each([&](const std::function<void(const std::string& str)>& cb) { cb(_index_file); });
_list_cb.clear();
}
}
void HlsMediaSource::getIndexFile(std::function<void(const std::string& str)> cb)
{
std::lock_guard<std::mutex> lck(_mtx_index);
if (!_index_file.empty()) {
cb(_index_file);
return;
}
//等待生成m3u8文件
_list_cb.emplace_back(std::move(cb));
}
} // namespace mediakit

View File

@ -13,6 +13,7 @@
#include "Common/MediaSource.h"
#include "Util/TimeTicker.h"
#include "Util/RingBuffer.h"
#include <atomic>
namespace mediakit {
@ -41,42 +42,12 @@ public:
/**
* m3u8索引文件内容
*/
void setIndexFile(std::string index_file) {
if (!_ring) {
std::weak_ptr<HlsMediaSource> weakSelf = std::dynamic_pointer_cast<HlsMediaSource>(shared_from_this());
auto lam = [weakSelf](int size) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->onReaderChanged(size);
};
_ring = std::make_shared<RingType>(0, std::move(lam));
regist();
}
//赋值m3u8索引文件内容
std::lock_guard<std::mutex> lck(_mtx_index);
_index_file = std::move(index_file);
if (!_index_file.empty()) {
_list_cb.for_each([&](const std::function<void(const std::string &str)> &cb) { cb(_index_file); });
_list_cb.clear();
}
}
void setIndexFile(std::string index_file);
/**
* m3u8文件
*/
void getIndexFile(std::function<void(const std::string &str)> cb) {
std::lock_guard<std::mutex> lck(_mtx_index);
if (!_index_file.empty()) {
cb(_index_file);
return;
}
//等待生成m3u8文件
_list_cb.emplace_back(std::move(cb));
}
void getIndexFile(std::function<void(const std::string &str)> cb);
/**
* m3u8文件

View File

@ -13,6 +13,7 @@
#include "MP4Muxer.h"
#include "Util/File.h"
#include "Extension/H264.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;

View File

@ -21,8 +21,7 @@
#include "RtmpDemuxer.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"

View File

@ -13,6 +13,9 @@
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
#include "Common/config.h"
#include "Common/Parser.h"
using namespace toolkit;
using namespace std;

View File

@ -13,6 +13,7 @@
#include "Util/util.h"
#include "Util/onceToken.h"
#include "Thread/ThreadPool.h"
#include "Common/Parser.h"
using namespace std;
using namespace toolkit;

View File

@ -13,6 +13,7 @@
#include "PSEncoder.h"
#include "Extension/H264.h"
#include "Rtsp/RtspMuxer.h"
#include "Common/config.h"
using namespace toolkit;

View File

@ -16,7 +16,7 @@
#include "PSEncoder.h"
#include "RawEncoder.h"
#include "Extension/CommonRtp.h"
#include "Common/MediaSource.h"
#include "Common/PacketCache.h"
namespace mediakit{
class RtpCache : protected PacketCache<toolkit::Buffer> {

View File

@ -14,6 +14,7 @@
#include "Thread/WorkThreadPool.h"
#include "Util/uv_errno.h"
#include "RtpCache.h"
#include "Rtcp/RtcpContext.h"
using namespace std;
using namespace toolkit;

View File

@ -15,6 +15,8 @@
#include "Extension/CommonRtp.h"
#include "Rtcp/RtcpContext.h"
#include "Common/MediaSource.h"
#include "Common/MediaSink.h"
namespace mediakit{
//rtp发送客户端支持发送GB28181协议

View File

@ -13,6 +13,7 @@
#include "RtpServer.h"
#include "RtpSelector.h"
#include "Rtcp/RtcpContext.h"
#include "Common/config.h"
using namespace std;
using namespace toolkit;

View File

@ -12,6 +12,8 @@
#include <cinttypes>
#include "Rtsp.h"
#include "Common/Parser.h"
#include "Common/config.h"
#include "Network/Socket.h"
using namespace std;
using namespace toolkit;
@ -589,6 +591,41 @@ RtpPacket::Ptr RtpPacket::create() {
#endif
}
/**
* title类型sdp
* @param dur_sec rtsp点播时长0
* @param header sdp描述
* @param version sdp版本
*/
TitleSdp::TitleSdp(float dur_sec, const std::map<std::string, std::string>& header, int version) : Sdp(0, 0) {
_printer << "v=" << version << "\r\n";
if (!header.empty()) {
for (auto &pr : header) {
_printer << pr.first << "=" << pr.second << "\r\n";
}
}
else {
_printer << "o=- 0 0 IN IP4 0.0.0.0\r\n";
_printer << "s=Streamed by " << kServerName << "\r\n";
_printer << "c=IN IP4 0.0.0.0\r\n";
_printer << "t=0 0\r\n";
}
if (dur_sec <= 0) {
//直播
_printer << "a=range:npt=now-\r\n";
}
else {
//点播
_dur_sec = dur_sec;
_printer << "a=range:npt=0-" << dur_sec << "\r\n";
}
_printer << "a=control:*\r\n";
}
}//namespace mediakit
namespace toolkit {

View File

@ -15,10 +15,11 @@
#include <string>
#include <memory>
#include <unordered_map>
#include "Util/util.h"
#include "Common/config.h"
#include "Common/macros.h"
#include "Extension/Frame.h"
namespace toolkit {
class Socket;
}
namespace mediakit {
@ -312,30 +313,7 @@ public:
*/
TitleSdp(float dur_sec = 0,
const std::map<std::string, std::string> &header = std::map<std::string, std::string>(),
int version = 0) : Sdp(0, 0) {
_printer << "v=" << version << "\r\n";
if (!header.empty()) {
for (auto &pr : header) {
_printer << pr.first << "=" << pr.second << "\r\n";
}
} else {
_printer << "o=- 0 0 IN IP4 0.0.0.0\r\n";
_printer << "s=Streamed by " << kServerName << "\r\n";
_printer << "c=IN IP4 0.0.0.0\r\n";
_printer << "t=0 0\r\n";
}
if (dur_sec <= 0) {
//直播
_printer << "a=range:npt=now-\r\n";
} else {
//点播
_dur_sec = dur_sec;
_printer << "a=range:npt=0-" << dur_sec << "\r\n";
}
_printer << "a=control:*\r\n";
}
int version = 0);
std::string getSdp() const override {
return _printer;
@ -357,7 +335,7 @@ private:
//创建rtp over tcp4个字节的头
toolkit::Buffer::Ptr makeRtpOverTcpPrefix(uint16_t size, uint8_t interleaved);
//创建rtp-rtcp端口对
void makeSockPair(std::pair<toolkit::Socket::Ptr, toolkit::Socket::Ptr> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
void makeSockPair(std::pair<std::shared_ptr<toolkit::Socket>, std::shared_ptr<toolkit::Socket>> &pair, const std::string &local_ip, bool re_use_port = false, bool is_udp = true);
//十六进制方式打印ssrc
std::string printSSRC(uint32_t ui32Ssrc);

View File

@ -18,8 +18,7 @@
#include <unordered_map>
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "RtpCodec.h"
#include "Util/logger.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#include "Util/TimeTicker.h"
#include "Util/ResourcePool.h"

View File

@ -12,6 +12,8 @@
#define ZLMEDIAKIT_TSMEDIASOURCE_H
#include "Common/MediaSource.h"
#include "Common/PacketCache.h"
#include "Util/RingBuffer.h"
#define TS_GOP_SIZE 512

View File

@ -9,7 +9,7 @@
#include "Network/Session.h"
#include "Poller/EventPoller.h"
#include "Poller/Timer.h"
#include "Common/Stamp.h"
#include "Common.hpp"
#include "NackContext.hpp"
#include "Packet.hpp"

View File

@ -1,6 +1,6 @@
#include "Util/util.h"
#include <memory>
#include "Common/Parser.h"
#include "SrtTransportImp.hpp"
namespace SRT {

View File

@ -10,6 +10,7 @@
#include "Sdp.h"
#include "Rtsp/Rtsp.h"
#include "Common/config.h"
#include <cinttypes>
using namespace std;