diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 9a545d7d..e5535a71 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 9a545d7d09fc4b570c1d29a798622c7923111735 +Subproject commit e5535a7164f55eb9062213f40ddc68c0294e6f57 diff --git a/CMakeLists.txt b/CMakeLists.txt index b9dbfa64..96e79890 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -193,6 +193,12 @@ if(UNIX) "-Wall;-Wextra" "-Wno-unused-function;-Wno-unused-parameter;-Wno-unused-variable" "-Wno-error=extra;-Wno-error=missing-field-initializers;-Wno-error=type-limits") + + if("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") + set(COMPILE_OPTIONS_DEFAULT ${COMPILE_OPTIONS_DEFAULT} "-g3") + else() + set(COMPILE_OPTIONS_DEFAULT ${COMPILE_OPTIONS_DEFAULT} "-g0") + endif() elseif(WIN32) if (MSVC) set(COMPILE_OPTIONS_DEFAULT @@ -364,7 +370,7 @@ if(ENABLE_JEMALLOC_STATIC) set(ENABLE_JEMALLOC_STAT OFF) endif () include(Jemalloc) - include_directories(SYSTEM ${DEP_ROOT_DIR}/${JEMALLOC_NAME}/include/jemalloc) + include_directories(SYSTEM ${DEP_ROOT_DIR}/${JEMALLOC_NAME}/include) link_directories(${DEP_ROOT_DIR}/${JEMALLOC_NAME}/lib) # 用于影响后续查找过程 # Used to affect subsequent lookup process diff --git a/conf/config.ini b/conf/config.ini index d1d8246c..302fb295 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -135,6 +135,8 @@ segDur=2 #m3u8索引中,hls保留切片个数(实际保留切片个数大2~3个) #如果设置为0,则不删除切片,而是保存为点播 segNum=3 +#HLS切片延迟个数,大于0将生成hls_delay.m3u8文件,0则不生成 +segDelay=0 #HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 segRetain=5 #是否广播 hls切片(ts/fmp4)完成通知(on_record_ts) @@ -289,7 +291,7 @@ sslport=0 # rtmp是否直接代理模式 directProxy=1 #h265 rtmp打包采用增强型rtmp标准还是国内拓展标准 -enhanced=1 +enhanced=0 [rtp] #音频mtu大小,该参数限制rtp最大字节数,推荐不要超过1400 diff --git a/ext-codec/H264Rtmp.cpp b/ext-codec/H264Rtmp.cpp index 05098779..30bf10eb 100644 --- a/ext-codec/H264Rtmp.cpp +++ b/ext-codec/H264Rtmp.cpp @@ -14,16 +14,24 @@ using namespace std; using namespace toolkit; +#define CHECK_RET(...) \ + try { \ + CHECK(__VA_ARGS__); \ + } catch (AssertFailedException & ex) { \ + WarnL << ex.what(); \ + return; \ + } + namespace mediakit { void H264RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt) { if (pkt->isConfigFrame()) { - CHECK(pkt->size() > 5); + CHECK_RET(pkt->size() > 5); getTrack()->setExtraData((uint8_t *)pkt->data() + 5, pkt->size() - 5); return; } - CHECK(pkt->size() > 9); + CHECK_RET(pkt->size() > 9); uint8_t *cts_ptr = (uint8_t *)(pkt->buffer.data() + 2); int32_t cts = (((cts_ptr[0] << 16) | (cts_ptr[1] << 8) | (cts_ptr[2])) + 0xff800000) ^ 0xff800000; auto pts = pkt->time_stamp + cts; diff --git a/ext-codec/H265Rtmp.cpp b/ext-codec/H265Rtmp.cpp index 9799849d..2b88795d 100644 --- a/ext-codec/H265Rtmp.cpp +++ b/ext-codec/H265Rtmp.cpp @@ -18,6 +18,14 @@ using namespace std; using namespace toolkit; +#define CHECK_RET(...) \ + try { \ + CHECK(__VA_ARGS__); \ + } catch (AssertFailedException & ex) { \ + WarnL << ex.what(); \ + return; \ + } + namespace mediakit { void H265RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt) { @@ -44,7 +52,7 @@ void H265RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt) { auto data = (uint8_t *)pkt->data() + RtmpPacketInfo::kEnhancedRtmpHeaderSize; auto size = pkt->size() - RtmpPacketInfo::kEnhancedRtmpHeaderSize; auto pts = pkt->time_stamp; - CHECK(size > 3); + CHECK_RET(size > 3); if (RtmpPacketType::PacketTypeCodedFrames == _info.video.pkt_type) { // SI24 = [CompositionTime Offset] int32_t cts = (((data[0] << 16) | (data[1] << 8) | (data[2])) + 0xff800000) ^ 0xff800000; @@ -52,7 +60,7 @@ void H265RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt) { data += 3; size -= 3; } - CHECK(size > 4); + CHECK_RET(size > 4); splitFrame(data, size, pkt->time_stamp, pts); break; } @@ -63,12 +71,12 @@ void H265RtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt) { // 国内扩展(12) H265 rtmp if (pkt->isConfigFrame()) { - CHECK(pkt->size() > 5); + CHECK_RET(pkt->size() > 5); getTrack()->setExtraData((uint8_t *)pkt->data() + 5, pkt->size() - 5); return; } - CHECK(pkt->size() > 9); + CHECK_RET(pkt->size() > 9); uint8_t *cts_ptr = (uint8_t *)(pkt->buffer.data() + 2); int32_t cts = (((cts_ptr[0] << 16) | (cts_ptr[1] << 8) | (cts_ptr[2])) + 0xff800000) ^ 0xff800000; auto pts = pkt->time_stamp + cts; diff --git a/package/rpm/ZLMediaKit.spec b/package/rpm/ZLMediaKit.spec index 9ff902d6..f26aba10 100644 --- a/package/rpm/ZLMediaKit.spec +++ b/package/rpm/ZLMediaKit.spec @@ -18,7 +18,7 @@ %bcond_with cxx_api Name: ZLMediaKit -Version: 5.0.0 +Version: 8.0.0 Release: 1%{?dist} Summary: A lightweight, high performance and stable stream server and client framework based on C++11. diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 44da3e5c..aa3df337 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -944,25 +944,29 @@ void installWebApi() { //批量断开tcp连接,比如说可以断开rtsp、rtmp播放器等 //测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935 - api_regist("/index/api/kick_sessions",[](API_ARGS_MAP){ + api_regist("/index/api/kick_sessions", [](API_ARGS_MAP) { CHECK_SECRET(); uint16_t local_port = allArgs["local_port"].as(); string peer_ip = allArgs["peer_ip"]; size_t count_hit = 0; list session_list; - SessionMap::Instance().for_each_session([&](const string &id,const Session::Ptr &session){ - if(local_port != 0 && local_port != session->get_local_port()){ + SessionMap::Instance().for_each_session([&](const string &id, const Session::Ptr &session) { + if (local_port != 0 && local_port != session->get_local_port()) { return; } - if(!peer_ip.empty() && peer_ip != session->get_peer_ip()){ + if (!peer_ip.empty() && peer_ip != session->get_peer_ip()) { + return; + } + if (session->getIdentifier() == sender.getIdentifier()) { + // 忽略本http链接 return; } session_list.emplace_back(session); ++count_hit; }); - for(auto &session : session_list){ + for (auto &session : session_list) { session->safeShutdown(); } val["count_hit"] = (Json::UInt64)count_hit; diff --git a/server/main.cpp b/server/main.cpp index 1b2f4b1a..c946beab 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -346,6 +346,11 @@ int start_main(int argc,char *argv[]) { uint16_t srtPort = mINI::Instance()[SRT::kPort]; #endif //defined(ENABLE_SRT) + installWebApi(); + InfoL << "已启动http api 接口"; + installWebHook(); + InfoL << "已启动http hook 接口"; + try { auto &secret = mINI::Instance()[API::kSecret]; if (secret == "035c73f7-bb6b-4889-a715-d9eb2d1925cc" || secret.empty()) { @@ -403,11 +408,6 @@ int start_main(int argc,char *argv[]) { return -1; } - installWebApi(); - InfoL << "已启动http api 接口"; - installWebHook(); - InfoL << "已启动http hook 接口"; - //设置退出信号处理函数 static semaphore sem; signal(SIGINT, [](int) { diff --git a/src/Codec/Transcode.cpp b/src/Codec/Transcode.cpp index aaa641fe..daffe5ac 100644 --- a/src/Codec/Transcode.cpp +++ b/src/Codec/Transcode.cpp @@ -244,8 +244,8 @@ AVFrame *FFmpegFrame::get() const { void FFmpegFrame::fillPicture(AVPixelFormat target_format, int target_width, int target_height) { assert(_data == nullptr); - _data = new char[av_image_get_buffer_size(target_format, target_width, target_height, 1)]; - av_image_fill_arrays(_frame->data, _frame->linesize, (uint8_t *) _data, target_format, target_width, target_height,1); + _data = new char[av_image_get_buffer_size(target_format, target_width, target_height, 32)]; + av_image_fill_arrays(_frame->data, _frame->linesize, (uint8_t *) _data, target_format, target_width, target_height, 32); } /////////////////////////////////////////////////////////////////////////// @@ -673,7 +673,7 @@ FFmpegFrame::Ptr FFmpegSws::inputFrame(const FFmpegFrame::Ptr &frame, int &ret, auto out = std::make_shared(); if (!out->get()->data[0]) { if (data) { - av_image_fill_arrays(out->get()->data, out->get()->linesize, data, _target_format, target_width, target_height, 1); + av_image_fill_arrays(out->get()->data, out->get()->linesize, data, _target_format, target_width, target_height, 32); } else { out->fillPicture(_target_format, target_width, target_height); } diff --git a/src/Common/MediaSink.cpp b/src/Common/MediaSink.cpp index 3fc5a9ea..837024af 100644 --- a/src/Common/MediaSink.cpp +++ b/src/Common/MediaSink.cpp @@ -37,6 +37,7 @@ bool MediaSink::addTrack(const Track::Ptr &track_in) { } // 克隆Track,只拷贝其数据,不拷贝其数据转发关系 auto track = track_in->clone(); + CHECK(track, "Clone track failed: ", track_in->getCodecName()); auto index = track->getIndex(); if (!_track_map.emplace(index, std::make_pair(track, false)).second) { WarnL << "Already add a same track: " << track->getIndex() << ", codec: " << track->getCodecName(); diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 82b3feca..357a9689 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -242,7 +242,7 @@ static onceToken token([]() { mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15; mINI::Instance()[kDirectProxy] = 1; - mINI::Instance()[kEnhanced] = 1; + mINI::Instance()[kEnhanced] = 0; }); } // namespace Rtmp @@ -307,6 +307,7 @@ namespace Hls { const string kSegmentDuration = HLS_FIELD "segDur"; const string kSegmentNum = HLS_FIELD "segNum"; const string kSegmentKeep = HLS_FIELD "segKeep"; +const string kSegmentDelay = HLS_FIELD "segDelay"; const string kSegmentRetain = HLS_FIELD "segRetain"; const string kFileBufSize = HLS_FIELD "fileBufSize"; const string kBroadcastRecordTs = HLS_FIELD "broadcastRecordTs"; @@ -317,6 +318,7 @@ static onceToken token([]() { mINI::Instance()[kSegmentDuration] = 2; mINI::Instance()[kSegmentNum] = 3; mINI::Instance()[kSegmentKeep] = false; + mINI::Instance()[kSegmentDelay] = 0; mINI::Instance()[kSegmentRetain] = 5; mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kBroadcastRecordTs] = false; diff --git a/src/Common/config.h b/src/Common/config.h index 7f9073d9..a6d7af4d 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -349,6 +349,8 @@ extern const std::string kSegmentDuration; extern const std::string kSegmentNum; // 如果设置为0,则不保留切片,设置为1则一直保留切片 extern const std::string kSegmentKeep; +// HLS切片延迟个数,大于0将生成hls_delay.m3u8文件,0则不生成 +extern const std::string kSegmentDelay; // HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 extern const std::string kSegmentRetain; // HLS文件写缓存大小 diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index 9904a784..49feeb94 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -197,8 +197,8 @@ AMFValue Factory::getAmfByCodecId(CodecId codecId) { Frame::Ptr Factory::getFrameFromPtr(CodecId codec, const char *data, size_t bytes, uint64_t dts, uint64_t pts) { auto it = s_plugins.find(codec); if (it == s_plugins.end()) { - WarnL << "Unsupported codec: " << getCodecName(codec); - return nullptr; + // 创建不支持codec的frame + return std::make_shared(codec, (char *)data, bytes, dts, pts); } return it->second->getFrameFromPtr(data, bytes, dts, pts); } diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index c58ea84b..6daafda5 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -69,12 +69,16 @@ CodecId getCodecByMovId(int object_id) { if (object_id == MOV_OBJECT_NONE) { return CodecInvalid; } - switch (object_id) { -#define XX(name, type, value, str, mpeg_id, mp4_id) case mp4_id : return name; - CODEC_MAP(XX) + +#define XX(name, type, value, str, mpeg_id, mp4_id) { mp4_id, name }, + static map s_map = { CODEC_MAP(XX) }; #undef XX - default : WarnL << "Unsupported mov: " << object_id; return CodecInvalid; + auto it = s_map.find(object_id); + if (it == s_map.end()) { + WarnL << "Unsupported mov: " << object_id; + return CodecInvalid; } + return it->second; } #endif @@ -89,17 +93,20 @@ int getMpegIdByCodec(CodecId codec) { } CodecId getCodecByMpegId(int mpeg_id) { - if (mpeg_id == PSI_STREAM_RESERVED) { + if (mpeg_id == PSI_STREAM_RESERVED || mpeg_id == 0xBD) { + // 海康的 PS 流中会有0xBD 的包 return CodecInvalid; } - switch (mpeg_id) { -#define XX(name, type, value, str, mpeg_id, mp4_id) case mpeg_id : return name; - CODEC_MAP(XX) + +#define XX(name, type, value, str, mpeg_id, mp4_id) { mpeg_id, name }, + static map s_map = { CODEC_MAP(XX) }; #undef XX - // 海康的 PS 流中会有0xBD 的包 - case 0xBD: return CodecInvalid; - default : WarnL << "Unsupported mpeg: " << mpeg_id; return CodecInvalid; + auto it = s_map.find(mpeg_id); + if (it == s_map.end()) { + WarnL << "Unsupported mpeg: " << mpeg_id; + return CodecInvalid; } + return it->second; } #endif diff --git a/src/Extension/Track.h b/src/Extension/Track.h index 8264b3a8..ba101282 100644 --- a/src/Extension/Track.h +++ b/src/Extension/Track.h @@ -129,8 +129,8 @@ public: _fps = fps; } - int getVideoHeight() const override { return _width; } - int getVideoWidth() const override { return _height; } + int getVideoWidth() const override { return _width; } + int getVideoHeight() const override { return _height; } float getVideoFps() const override { return _fps; } bool ready() const override { return true; } diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 38eb5473..a593cc62 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -124,29 +124,18 @@ ssize_t HttpSession::onRecvHeader(const char *header, size_t len) { } //// body size明确指定且小于最大值的情况 //// - auto body = std::make_shared(); - // 预留一定的内存buffer,防止频繁的内存拷贝 - body->reserve(content_len); - - _on_recv_body = [this, body, content_len, it](const char *data, size_t len) mutable { - body->append(data, len); - if (body->size() < content_len) { - // 未收满数据 - return true; - } - + _on_recv_body = [this, it](const char *data, size_t len) mutable { // 收集body完毕 - _parser.setContent(std::move(*body)); + _parser.setContent(std::string(data, len)); (this->*(it->second))(); _parser.clear(); - // 后续是header - setContentLen(0); + // _on_recv_body置空 return false; }; - // 声明后续都是body;Http body在本对象缓冲,不通过HttpRequestSplitter保存 - return -1; + // 声明body长度,通过HttpRequestSplitter缓存然后一次性回调到_on_recv_body + return content_len; } void HttpSession::onRecvContent(const char *data, size_t len) { diff --git a/src/Record/HlsMaker.cpp b/src/Record/HlsMaker.cpp index 2b074e1b..11d5190a 100644 --- a/src/Record/HlsMaker.cpp +++ b/src/Record/HlsMaker.cpp @@ -24,15 +24,40 @@ HlsMaker::HlsMaker(bool is_fmp4, float seg_duration, uint32_t seg_number, bool s _seg_keep = seg_keep; } -void HlsMaker::makeIndexFile(bool eof) { +void HlsMaker::makeIndexFile(bool include_delay, bool eof) { + GET_CONFIG(uint32_t, segDelay, Hls::kSegmentDelay); + GET_CONFIG(uint32_t, segRetain, Hls::kSegmentRetain); + std::deque> temp(_seg_dur_list); + if (!include_delay) { + while (temp.size() > _seg_number) { + temp.pop_front(); + } + } int maxSegmentDuration = 0; - for (auto &tp : _seg_dur_list) { + for (auto &tp : temp) { int dur = std::get<0>(tp); if (dur > maxSegmentDuration) { maxSegmentDuration = dur; } } - auto index_seq = _seg_number ? (_file_index > _seg_number ? _file_index - _seg_number : 0LL) : 0LL; + uint64_t index_seq; + if (_seg_number) { + if (include_delay) { + if (_file_index > _seg_number + segDelay) { + index_seq = _file_index - _seg_number - segDelay; + } else { + index_seq = 0LL; + } + } else { + if (_file_index > _seg_number) { + index_seq = _file_index - _seg_number; + } else { + index_seq = 0LL; + } + } + } else { + index_seq = 0LL; + } string index_str; index_str.reserve(2048); @@ -50,7 +75,7 @@ void HlsMaker::makeIndexFile(bool eof) { } stringstream ss; - for (auto &tp : _seg_dur_list) { + for (auto &tp : temp) { ss << "#EXTINF:" << std::setprecision(3) << std::get<0>(tp) / 1000.0 << ",\n" << std::get<1>(tp) << "\n"; } index_str += ss.str(); @@ -58,7 +83,7 @@ void HlsMaker::makeIndexFile(bool eof) { if (eof) { index_str += "#EXT-X-ENDLIST\n"; } - onWriteHls(index_str); + onWriteHls(index_str, include_delay); } void HlsMaker::inputInitSegment(const char *data, size_t len) { @@ -91,12 +116,13 @@ void HlsMaker::inputData(const char *data, size_t len, uint64_t timestamp, bool } void HlsMaker::delOldSegment() { + GET_CONFIG(uint32_t, segDelay, Hls::kSegmentDelay); if (_seg_number == 0) { //如果设置为保留0个切片,则认为是保存为点播 return; } //在hls m3u8索引文件中,我们保存的切片个数跟_seg_number相关设置一致 - if (_file_index > _seg_number) { + if (_file_index > _seg_number + segDelay) { _seg_dur_list.pop_front(); } //如果设置为一直保存,就不删除 @@ -105,8 +131,8 @@ void HlsMaker::delOldSegment() { } GET_CONFIG(uint32_t, segRetain, Hls::kSegmentRetain); //但是实际保存的切片个数比m3u8所述多若干个,这样做的目的是防止播放器在切片删除前能下载完毕 - if (_file_index > _seg_number + segRetain) { - onDelSegment(_file_index - _seg_number - segRetain - 1); + if (_file_index > _seg_number + segDelay + segRetain) { + onDelSegment(_file_index - _seg_number - segDelay - segRetain - 1); } } @@ -125,6 +151,7 @@ void HlsMaker::addNewSegment(uint64_t stamp) { } void HlsMaker::flushLastSegment(bool eof){ + GET_CONFIG(uint32_t, segDelay, Hls::kSegmentDelay); if (_last_file_name.empty()) { //不存在上个切片 return; @@ -139,7 +166,11 @@ void HlsMaker::flushLastSegment(bool eof){ //先flush ts切片,否则可能存在ts文件未写入完毕就被访问的情况 onFlushLastSegment(seg_dur); //然后写m3u8文件 - makeIndexFile(eof); + makeIndexFile(false, eof); + //写入切片延迟的m3u8文件 + if (segDelay) { + makeIndexFile(true, eof); + } } bool HlsMaker::isLive() const { diff --git a/src/Record/HlsMaker.h b/src/Record/HlsMaker.h index c06117e7..a95e2296 100644 --- a/src/Record/HlsMaker.h +++ b/src/Record/HlsMaker.h @@ -96,7 +96,7 @@ protected: /** * 写m3u8文件回调 */ - virtual void onWriteHls(const std::string &data) = 0; + virtual void onWriteHls(const std::string &data, bool include_delay) = 0; /** * 上一个 ts 切片写入完成, 可在这里进行通知处理 @@ -115,7 +115,7 @@ private: * 生成m3u8文件 * @param eof true代表点播 */ - void makeIndexFile(bool eof = false); + void makeIndexFile(bool include_delay, bool eof = false); /** * 删除旧的ts切片 diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index 7fc2b000..e987f820 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -21,11 +21,20 @@ using namespace toolkit; namespace mediakit { +std::string getDelayPath(const std::string& originalPath) { + std::size_t pos = originalPath.find(".m3u8"); + if (pos != std::string::npos) { + return originalPath.substr(0, pos) + "_delay.m3u8"; + } + return originalPath; +} + HlsMakerImp::HlsMakerImp(bool is_fmp4, const string &m3u8_file, const string ¶ms, uint32_t bufSize, float seg_duration, uint32_t seg_number, bool seg_keep) : HlsMaker(is_fmp4, seg_duration, seg_number, seg_keep) { _poller = EventPollerPool::Instance().getPoller(); _path_prefix = m3u8_file.substr(0, m3u8_file.rfind('/')); _path_hls = m3u8_file; + _path_hls_delay = getDelayPath(m3u8_file); _params = params; _buf_size = bufSize; _file_buf.reset(new char[bufSize], [](char *ptr) { delete[] ptr; }); @@ -62,6 +71,7 @@ void HlsMakerImp::clearCache(bool immediately, bool eof) { { std::list lst; lst.emplace_back(_path_hls); + lst.emplace_back(_path_hls_delay); if (!_path_init.empty()) { lst.emplace_back(_path_init); } @@ -146,16 +156,17 @@ void HlsMakerImp::onWriteSegment(const char *data, size_t len) { } } -void HlsMakerImp::onWriteHls(const std::string &data) { - auto hls = makeFile(_path_hls); +void HlsMakerImp::onWriteHls(const std::string &data, bool include_delay) { + auto path = include_delay ? _path_hls_delay : _path_hls; + auto hls = makeFile(path); if (hls) { fwrite(data.data(), data.size(), 1, hls.get()); hls.reset(); - if (_media_src) { + if (_media_src && !include_delay) { _media_src->setIndexFile(data); } } else { - WarnL << "Create hls file failed," << _path_hls << " " << get_uv_errmsg(); + WarnL << "Create hls file failed," << path << " " << get_uv_errmsg(); } } diff --git a/src/Record/HlsMakerImp.h b/src/Record/HlsMakerImp.h index 7d268a53..b3bf77b9 100644 --- a/src/Record/HlsMakerImp.h +++ b/src/Record/HlsMakerImp.h @@ -49,7 +49,7 @@ protected: void onDelSegment(uint64_t index) override; void onWriteInitSegment(const char *data, size_t len) override; void onWriteSegment(const char *data, size_t len) override; - void onWriteHls(const std::string &data) override; + void onWriteHls(const std::string &data, bool include_delay) override; void onFlushLastSegment(uint64_t duration_ms) override; private: @@ -60,6 +60,7 @@ private: int _buf_size; std::string _params; std::string _path_hls; + std::string _path_hls_delay; std::string _path_init; std::string _path_prefix; RecordInfo _info; diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 37c6cfd7..5d40be1d 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -104,7 +104,7 @@ bool MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { case CodecH264: case CodecH265: { // 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + track.merger.inputFrame(frame, [this, &track](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { int64_t dts_out, pts_out; track.stamp.revise(dts, pts, dts_out, pts_out); mp4_writer_write(_mov_writter.get(), track.track_id, buffer->data(), buffer->size(), pts_out, dts_out, have_idr ? MOV_AV_FLAG_KEYFREAME : 0); diff --git a/src/Record/MPEG.cpp b/src/Record/MPEG.cpp index ef844277..bca086dc 100644 --- a/src/Record/MPEG.cpp +++ b/src/Record/MPEG.cpp @@ -55,7 +55,7 @@ bool MpegMuxer::inputFrame(const Frame::Ptr &frame) { case CodecH264: case CodecH265: { // 这里的代码逻辑是让SPS、PPS、IDR这些时间戳相同的帧打包到一起当做一个帧处理, - return track.merger.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { + return track.merger.inputFrame(frame, [this, &track](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool have_idr) { _key_pos = have_idr; // 取视频时间戳为TS的时间戳 _timestamp = dts; diff --git a/src/Rtcp/RtcpFCI.cpp b/src/Rtcp/RtcpFCI.cpp index 9683f46f..843798df 100644 --- a/src/Rtcp/RtcpFCI.cpp +++ b/src/Rtcp/RtcpFCI.cpp @@ -210,7 +210,7 @@ string FCI_NACK::dumpString() const { } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - +#pragma pack(push, 1) class RunLengthChunk { public: static size_t constexpr kSize = 2; @@ -241,6 +241,7 @@ public: // 打印本对象 string dumpString() const; }; +#pragma pack(pop) RunLengthChunk::RunLengthChunk(SymbolStatus status, uint16_t run_length) { type = 0; @@ -261,7 +262,7 @@ string RunLengthChunk::dumpString() const { } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - +#pragma pack(push, 1) class StatusVecChunk { public: static size_t constexpr kSize = 2; @@ -292,6 +293,7 @@ public: // 打印本对象 string dumpString() const; }; +#pragma pack(pop) StatusVecChunk::StatusVecChunk(bool symbol_bit, const vector &status) { CHECK(status.size() << symbol_bit <= 14); diff --git a/src/Rtcp/RtcpFCI.h b/src/Rtcp/RtcpFCI.h index 10d2f558..43c45487 100644 --- a/src/Rtcp/RtcpFCI.h +++ b/src/Rtcp/RtcpFCI.h @@ -14,6 +14,7 @@ #include "Rtcp.h" namespace mediakit { +#pragma pack(push, 1) /////////////////////////////////////////// PSFB //////////////////////////////////////////////////// @@ -375,6 +376,6 @@ private: // feedback packet count,反馈包号,本包是第几个transport-cc包,每次加1 | uint8_t fb_pkt_count; }; - +#pragma pack(pop) } // namespace mediakit #endif // ZLMEDIAKIT_RTCPFCI_H diff --git a/src/Rtp/Decoder.cpp b/src/Rtp/Decoder.cpp index dacd3ab5..ca4b20b7 100644 --- a/src/Rtp/Decoder.cpp +++ b/src/Rtp/Decoder.cpp @@ -111,7 +111,7 @@ void DecoderImp::onDecode(int stream, int codecid, int flags, int64_t pts, int64 onTrack(stream, Factory::getTrackByCodecId(codec, 8000, 1, 16)); } if (!ref.first) { - WarnL << "not support codec :" << getCodecName(codec); + WarnL << "Unsupported codec :" << getCodecName(codec); return; } auto frame = Factory::getFrameFromPtr(codec, (char *)data, bytes, dts, pts); @@ -119,7 +119,7 @@ void DecoderImp::onDecode(int stream, int codecid, int flags, int64_t pts, int64 onFrame(stream, frame); return; } - ref.second.inputFrame(frame, [&](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) { + ref.second.inputFrame(frame, [this, stream, codec](uint64_t dts, uint64_t pts, const Buffer::Ptr &buffer, bool) { onFrame(stream, Factory::getFrameFromBuffer(codec, buffer, dts, pts)); }); } diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 85f0c2fb..40db50f3 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -46,11 +46,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) } } -RtpSession::~RtpSession() { - if (_process) { - RtpSelector::Instance().delProcess(_stream_id, _process.get()); - } -} +RtpSession::~RtpSession() = default; void RtpSession::onRecv(const Buffer::Ptr &data) { if (_is_udp) { @@ -62,6 +58,9 @@ void RtpSession::onRecv(const Buffer::Ptr &data) { void RtpSession::onError(const SockException &err) { WarnP(this) << _stream_id << " " << err; + if (_process) { + RtpSelector::Instance().delProcess(_stream_id, _process.get()); + } } void RtpSession::onManager() { diff --git a/tests/test_bench_forward.cpp b/tests/test_bench_forward.cpp index eb6df202..a71994d1 100644 --- a/tests/test_bench_forward.cpp +++ b/tests/test_bench_forward.cpp @@ -180,7 +180,7 @@ int main(int argc, char *argv[]) { auto pusher = std::make_shared(src); pusher->setOnCreateSocket([](const EventPoller::Ptr &poller) { //socket关闭互斥锁,提高性能 - return std::make_shared(poller, false); + return Socket::createSocket(poller, false); }); //设置推流失败监听 pusher->setOnPublished([&mtx, &pusher_map, index](const SockException &ex) { diff --git a/tests/test_bench_pull.cpp b/tests/test_bench_pull.cpp index c16de46a..dd455f8e 100644 --- a/tests/test_bench_pull.cpp +++ b/tests/test_bench_pull.cpp @@ -123,7 +123,7 @@ int main(int argc, char *argv[]) { auto tag = player.get(); player->setOnCreateSocket([](const EventPoller::Ptr &poller) { //socket关闭互斥锁,提高性能 - return std::make_shared(poller, false); + return Socket::createSocket(poller, false); }); //设置播放失败监听 player->setOnPlayResult([&mtx, &player_map, tag](const SockException &ex) { diff --git a/tests/test_bench_push.cpp b/tests/test_bench_push.cpp index 604ad76f..ee07f0c2 100644 --- a/tests/test_bench_push.cpp +++ b/tests/test_bench_push.cpp @@ -166,7 +166,7 @@ int main(int argc, char *argv[]) { auto tag = pusher.get(); pusher->setOnCreateSocket([](const EventPoller::Ptr &poller) { //socket关闭互斥锁,提高性能 - return std::make_shared(poller, false); + return Socket::createSocket(poller, false); }); //设置推流失败监听 pusher->setOnPublished([&mtx, &pusher_map, tag](const SockException &ex) { diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 5bdffa12..9bb7e4bc 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -10,11 +10,8 @@ #include #include -#include "Util/MD5.h" -#include "Util/File.h" -#include "Util/logger.h" -#include "Util/SSLBox.h" #include "Util/util.h" +#include "Util/logger.h" #include "Network/TcpServer.h" #include "Common/config.h" #include "Rtsp/RtspSession.h" @@ -29,102 +26,102 @@ using namespace mediakit; static semaphore sem; #if defined(ENABLE_RTPPROXY) -static bool loadFile(const char *path, const EventPoller::Ptr &poller){ - FILE *fp = fopen(path, "rb"); +static bool loadFile(const char *path, const EventPoller::Ptr &poller) { + std::shared_ptr fp(fopen(path, "rb"), [](FILE *fp) { + sem.post(); + if (fp) { + fclose(fp); + } + }); if (!fp) { WarnL << "open file failed:" << path; return false; } - uint64_t timeStamp_last = 0; - uint16_t len; - char rtp[0xFFFF]; struct sockaddr_storage addr; memset(&addr, 0, sizeof(addr)); addr.ss_family = AF_INET; auto sock = Socket::createSocket(poller); - size_t total_size = 0; - RtpProcess::Ptr process; - uint32_t ssrc = 0; - while (true) { - if (2 != fread(&len, 1, 2, fp)) { - WarnL; - break; - } - len = ntohs(len); - if (len < 12 || len > sizeof(rtp)) { - WarnL << len; - break; - } + auto process = RtpSelector::Instance().getProcess("test", true); - if (len != fread(rtp, 1, len, fp)) { - WarnL; - break; - } - total_size += len; - uint64_t timeStamp = 0; - - if (!process) { - if (!RtpSelector::getSSRC(rtp, len, ssrc)) { - WarnL << "get ssrc from rtp failed:" << len; - return false; + uint64_t stamp_last = 0; + auto total_size = std::make_shared(0); + auto do_read = [fp, total_size, sock, addr, process, stamp_last]() mutable -> int { + uint16_t len; + char rtp[0xFFFF]; + while (true) { + if (2 != fread(&len, 1, 2, fp.get())) { + WarnL << "Read rtp size failed"; + // 重新播放 + fseek(fp.get(), 0, SEEK_SET); + return 1; } - process = RtpSelector::Instance().getProcess(printSSRC(ssrc), true); - } - if (process) { + len = ntohs(len); + if (len < 12 || len > sizeof(rtp)) { + WarnL << "Invalid rtp size: " << len; + return 0; + } + + if (len != fread(rtp, 1, len, fp.get())) { + WarnL << "Read rtp data failed"; + return 0; + } + (*total_size) += len; + uint64_t stamp = 0; try { - process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &timeStamp); - } catch (...) { - RtpSelector::Instance().delProcess(printSSRC(ssrc), process.get()); - throw; + process->inputRtp(true, sock, rtp, len, (struct sockaddr *)&addr, &stamp); + } catch (std::exception &ex) { + WarnL << "Input rtp failed: " << ex.what(); + return 0; + } + + auto diff = stamp - stamp_last; + if (diff < 0 || diff > 500) { + diff = 1; + } + if (diff) { + stamp_last = stamp; + return diff; } } - - auto diff = timeStamp - timeStamp_last; - if (diff > 0 && diff < 500) { - usleep(diff * 1000); - } else { - usleep(1 * 1000); + }; + poller->doDelayTask(1, [do_read, total_size, process]() mutable { + auto ret = do_read(); + if (!ret) { + WarnL << *total_size / 1024 << "KB"; + RtpSelector::Instance().delProcess("test", process.get()); } - timeStamp_last = timeStamp; - } - WarnL << total_size / 1024 << "KB"; - fclose(fp); + return ret; + }); + return true; } -#endif//#if defined(ENABLE_RTPPROXY) +#endif // #if defined(ENABLE_RTPPROXY) -int main(int argc,char *argv[]) { - //设置日志 +int main(int argc, char *argv[]) { + // 设置日志 Logger::Instance().add(std::make_shared("ConsoleChannel")); #if defined(ENABLE_RTPPROXY) - //启动异步日志线程 + // 启动异步日志线程 Logger::Instance().setWriter(std::make_shared()); loadIniConfig((exeDir() + "config.ini").data()); TcpServer::Ptr rtspSrv(new TcpServer()); TcpServer::Ptr rtmpSrv(new TcpServer()); TcpServer::Ptr httpSrv(new TcpServer()); - rtspSrv->start(554);//默认554 - rtmpSrv->start(1935);//默认1935 - httpSrv->start(80);//默认80 - //此处选择是否导出调试文件 -// mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; + rtspSrv->start(554); // 默认554 + rtmpSrv->start(1935); // 默认1935 + httpSrv->start(80); // 默认80 + // 此处选择是否导出调试文件 + // mINI::Instance()[RtpProxy::kDumpDir] = "/Users/xzl/Desktop/"; - if (argc == 2){ - auto poller = EventPollerPool::Instance().getPoller(); - poller->async_first([poller,argv](){ - loadFile(argv[1],poller); - sem.post(); - }); + if (argc == 2) { + loadFile(argv[1], EventPollerPool::Instance().getPoller()); sem.wait(); - sleep(1); + } else { + ErrorL << "parameter error."; } - else - ErrorL << "parameter error."; #else ErrorL << "please ENABLE_RTPPROXY and then test"; -#endif//#if defined(ENABLE_RTPPROXY) +#endif // #if defined(ENABLE_RTPPROXY) return 0; } - - diff --git a/webrtc/SctpAssociation.cpp b/webrtc/SctpAssociation.cpp index 84a2c04f..1b5d2782 100644 --- a/webrtc/SctpAssociation.cpp +++ b/webrtc/SctpAssociation.cpp @@ -677,9 +677,9 @@ namespace RTC if (notification->sn_header.sn_length > 0) { static const size_t BufferSize{ 1024 }; - static char buffer[BufferSize]; + thread_local static char buffer[BufferSize]; - uint32_t len = notification->sn_header.sn_length; + uint32_t len = notification->sn_assoc_change.sac_length - sizeof(struct sctp_assoc_change); for (uint32_t i{ 0 }; i < len; ++i) { @@ -745,9 +745,9 @@ namespace RTC if (notification->sn_header.sn_length > 0) { static const size_t BufferSize{ 1024 }; - static char buffer[BufferSize]; + thread_local static char buffer[BufferSize]; - uint32_t len = notification->sn_header.sn_length; + uint32_t len = notification->sn_assoc_change.sac_length - sizeof(struct sctp_assoc_change); for (uint32_t i{ 0 }; i < len; ++i) { @@ -786,7 +786,7 @@ namespace RTC case SCTP_REMOTE_ERROR: { static const size_t BufferSize{ 1024 }; - static char buffer[BufferSize]; + thread_local static char buffer[BufferSize]; uint32_t len = notification->sn_remote_error.sre_length - sizeof(struct sctp_remote_error); @@ -822,7 +822,7 @@ namespace RTC case SCTP_SEND_FAILED_EVENT: { static const size_t BufferSize{ 1024 }; - static char buffer[BufferSize]; + thread_local static char buffer[BufferSize]; uint32_t len = notification->sn_send_failed_event.ssfe_length - sizeof(struct sctp_send_failed_event); @@ -1004,4 +1004,4 @@ namespace RTC } // namespace RTC -#endif //ENABLE_SCTP \ No newline at end of file +#endif //ENABLE_SCTP diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 685e9f89..7fc8816e 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -59,8 +59,8 @@ static onceToken token([]() { mINI::Instance()[kTimeOutSec] = 15; mINI::Instance()[kExternIP] = ""; mINI::Instance()[kRembBitRate] = 0; - mINI::Instance()[kPort] = 0; - mINI::Instance()[kTcpPort] = 0; + mINI::Instance()[kPort] = 8000; + mINI::Instance()[kTcpPort] = 8000; mINI::Instance()[kStartBitrate] = 0; mINI::Instance()[kMaxBitrate] = 0; @@ -1380,4 +1380,4 @@ static onceToken s_rtc_auto_register([]() { }); }); -}// namespace mediakit \ No newline at end of file +}// namespace mediakit