This commit is contained in:
xingqiao 2024-03-11 14:06:42 +08:00
commit 7df189c555
10 changed files with 179 additions and 194 deletions

View File

@ -33,12 +33,11 @@ static TcpServer::Ptr shell_server;
#ifdef ENABLE_RTPPROXY #ifdef ENABLE_RTPPROXY
#include "Rtp/RtpServer.h" #include "Rtp/RtpServer.h"
static std::shared_ptr<RtpServer> rtpServer; static RtpServer::Ptr rtpServer;
#endif #endif
#ifdef ENABLE_WEBRTC #ifdef ENABLE_WEBRTC
#include "../webrtc/WebRtcSession.h" #include "../webrtc/WebRtcSession.h"
#include "../webrtc/WebRtcTransport.h"
static UdpServer::Ptr rtcServer_udp; static UdpServer::Ptr rtcServer_udp;
static TcpServer::Ptr rtcServer_tcp; static TcpServer::Ptr rtcServer_tcp;
#endif #endif

View File

@ -277,6 +277,8 @@ sampleMS=500
fastStart=0 fastStart=0
#MP4点播(rtsp/rtmp/http-flv/ws-flv)是否循环播放文件 #MP4点播(rtsp/rtmp/http-flv/ws-flv)是否循环播放文件
fileRepeat=0 fileRepeat=0
#MP4录制写文件格式是否采用fmp4启用的话断电未完成录制的文件也能正常打开
enableFmp4=0
[rtmp] [rtmp]
#rtmp必须在此时间内完成握手否则服务器会断开链接单位秒 #rtmp必须在此时间内完成握手否则服务器会断开链接单位秒

View File

@ -38,16 +38,12 @@ bool G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {
auto ptr = _cache_frame->data() + _cache_frame->prefixSize(); auto ptr = _cache_frame->data() + _cache_frame->prefixSize();
auto len = _cache_frame->size() - _cache_frame->prefixSize(); auto len = _cache_frame->size() - _cache_frame->prefixSize();
auto remain_size = len; auto remain_size = len;
auto max_size = 160 * _channels * _pkt_dur_ms / 20; // 20 ms per 160 byte size_t max_size = 160 * _channels * _pkt_dur_ms / 20; // 20 ms per 160 byte
uint32_t n = 0; size_t n = 0;
bool mark = true; bool mark = true;
while (remain_size >= max_size) { while (remain_size >= max_size) {
size_t rtp_size; assert(remain_size >= max_size);
if (remain_size >= max_size) { const size_t rtp_size = max_size;
rtp_size = max_size;
} else {
break;
}
n++; n++;
stamp += _pkt_dur_ms; stamp += _pkt_dur_ms;
RtpCodec::inputRtp(getRtpInfo().makeRtp(TrackAudio, ptr, rtp_size, mark, stamp), true); RtpCodec::inputRtp(getRtpInfo().makeRtp(TrackAudio, ptr, rtp_size, mark, stamp), true);

View File

@ -297,22 +297,71 @@ static inline void addHttpListener(){
}); });
} }
template <typename Type>
class ServiceController {
public:
using Pointer = std::shared_ptr<Type>;
std::unordered_map<std::string, Pointer> _map;
mutable std::recursive_mutex _mtx;
void clear() {
decltype(_map) copy;
{
std::lock_guard<std::recursive_mutex> lck(_mtx);
copy.swap(_map);
}
}
size_t erase(const std::string &key) {
std::lock_guard<std::recursive_mutex> lck(_mtx);
return _map.erase(key);
}
Pointer find(const std::string &key) const {
std::lock_guard<std::recursive_mutex> lck(_mtx);
auto it = _map.find(key);
if (it == _map.end()) {
return nullptr;
}
return it->second;
}
template<class ..._Args>
Pointer make(const std::string &key, _Args&& ...__args) {
// assert(!find(key));
auto server = std::make_shared<Type>(std::forward<_Args>(__args)...);
std::lock_guard<std::recursive_mutex> lck(_mtx);
auto it = _map.emplace(key, server);
assert(it.second);
return server;
}
template<class ..._Args>
Pointer makeWithAction(const std::string &key, function<void(Pointer)> action, _Args&& ...__args) {
// assert(!find(key));
auto server = std::make_shared<Type>(std::forward<_Args>(__args)...);
action(server);
std::lock_guard<std::recursive_mutex> lck(_mtx);
auto it = _map.emplace(key, server);
assert(it.second);
return server;
}
};
//拉流代理器列表 //拉流代理器列表
static unordered_map<string, PlayerProxy::Ptr> s_proxyMap; static ServiceController<PlayerProxy> s_player_proxy;
static recursive_mutex s_proxyMapMtx;
//推流代理器列表 //推流代理器列表
static unordered_map<string, PusherProxy::Ptr> s_proxyPusherMap; static ServiceController<PusherProxy> s_pusher_proxy;
static recursive_mutex s_proxyPusherMapMtx;
//FFmpeg拉流代理器列表 //FFmpeg拉流代理器列表
static unordered_map<string, FFmpegSource::Ptr> s_ffmpegMap; static ServiceController<FFmpegSource> s_ffmpeg_src;
static recursive_mutex s_ffmpegMapMtx;
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
//rtp服务器列表 //rtp服务器列表
static unordered_map<string, RtpServer::Ptr> s_rtpServerMap; static ServiceController<RtpServer> s_rtp_server;
static recursive_mutex s_rtpServerMapMtx;
#endif #endif
static inline string getProxyKey(const string &vhost, const string &app, const string &stream) { static inline string getProxyKey(const string &vhost, const string &app, const string &stream) {
@ -416,46 +465,23 @@ Value makeMediaSourceJson(MediaSource &media){
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); if (s_rtp_server.find(stream_id)) {
if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) {
//为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id //为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id
return 0; return 0;
} }
RtpServer::Ptr server = std::make_shared<RtpServer>(); auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) {
server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex);
});
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id]() {
//设置rtp超时移除事件 //设置rtp超时移除事件
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); s_rtp_server.erase(stream_id);
s_rtpServerMap.erase(stream_id);
}); });
//保存对象
s_rtpServerMap.emplace(stream_id, server);
//回复json //回复json
return server->getPort(); return server->getPort();
} }
void connectRtpServer(const string &stream_id, const string &dst_url, uint16_t dst_port, const function<void(const SockException &ex)> &cb) {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
auto it = s_rtpServerMap.find(stream_id);
if (it == s_rtpServerMap.end()) {
cb(SockException(Err_other, "未找到rtp服务"));
return;
}
it->second->connectToServer(dst_url, dst_port, cb);
}
bool closeRtpServer(const string &stream_id) {
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
auto it = s_rtpServerMap.find(stream_id);
if (it == s_rtpServerMap.end()) {
return false;
}
auto server = it->second;
s_rtpServerMap.erase(it);
return true;
}
#endif #endif
void getStatisticJson(const function<void(Value &val)> &cb) { void getStatisticJson(const function<void(Value &val)> &cb) {
@ -546,15 +572,13 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream
const ProtocolOption &option, int rtp_type, float timeout_sec, const mINI &args, const ProtocolOption &option, int rtp_type, float timeout_sec, const mINI &args,
const function<void(const SockException &ex, const string &key)> &cb) { const function<void(const SockException &ex, const string &key)> &cb) {
auto key = getProxyKey(vhost, app, stream); auto key = getProxyKey(vhost, app, stream);
lock_guard<recursive_mutex> lck(s_proxyMapMtx); if (s_player_proxy.find(key)) {
if (s_proxyMap.find(key) != s_proxyMap.end()) {
//已经在拉流了 //已经在拉流了
cb(SockException(Err_other, "This stream already exists"), key); cb(SockException(Err_other, "This stream already exists"), key);
return; return;
} }
//添加拉流代理 //添加拉流代理
auto player = std::make_shared<PlayerProxy>(vhost, app, stream, option, retry_count); auto player = s_player_proxy.make(key, vhost, app, stream, option, retry_count);
s_proxyMap[key] = player;
// 先透传参数 // 先透传参数
player->mINI::operator=(args); player->mINI::operator=(args);
@ -562,7 +586,7 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream
//指定RTP over TCP(播放rtsp时有效) //指定RTP over TCP(播放rtsp时有效)
(*player)[Client::kRtpType] = rtp_type; (*player)[Client::kRtpType] = rtp_type;
if (timeout_sec > 0.1) { if (timeout_sec > 0.1f) {
//播放握手超时时间 //播放握手超时时间
(*player)[Client::kTimeoutMS] = timeout_sec * 1000; (*player)[Client::kTimeoutMS] = timeout_sec * 1000;
} }
@ -570,20 +594,68 @@ void addStreamProxy(const string &vhost, const string &app, const string &stream
//开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试 //开始播放,如果播放失败或者播放中止,将会自动重试若干次,默认一直重试
player->setPlayCallbackOnce([cb, key](const SockException &ex) { player->setPlayCallbackOnce([cb, key](const SockException &ex) {
if (ex) { if (ex) {
lock_guard<recursive_mutex> lck(s_proxyMapMtx); s_player_proxy.erase(key);
s_proxyMap.erase(key);
} }
cb(ex, key); cb(ex, key);
}); });
//被主动关闭拉流 //被主动关闭拉流
player->setOnClose([key](const SockException &ex) { player->setOnClose([key](const SockException &ex) {
lock_guard<recursive_mutex> lck(s_proxyMapMtx); s_player_proxy.erase(key);
s_proxyMap.erase(key);
}); });
player->play(url); player->play(url);
}; };
void addStreamPusherProxy(const string &schema,
const string &vhost,
const string &app,
const string &stream,
const string &url,
int retry_count,
int rtp_type,
float timeout_sec,
const function<void(const SockException &ex, const string &key)> &cb) {
auto key = getPusherKey(schema, vhost, app, stream, url);
auto src = MediaSource::find(schema, vhost, app, stream);
if (!src) {
cb(SockException(Err_other, "can not find the source stream"), key);
return;
}
if (s_pusher_proxy.find(key)) {
//已经在推流了
cb(SockException(Err_success), key);
return;
}
//添加推流代理
auto pusher = s_pusher_proxy.make(key, src, retry_count);
//指定RTP over TCP(播放rtsp时有效)
pusher->emplace(Client::kRtpType, rtp_type);
if (timeout_sec > 0.1f) {
//推流握手超时时间
pusher->emplace(Client::kTimeoutMS, timeout_sec * 1000);
}
//开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试
pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) {
if (ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
s_pusher_proxy.erase(key);
}
cb(ex, key);
});
//被主动关闭推流
pusher->setOnClose([key, url](const SockException &ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
s_pusher_proxy.erase(key);
});
pusher->publish(url);
}
template <typename Type> template <typename Type>
static void getArgsValue(const HttpAllArgs<ApiArgsType> &allArgs, const string &key, Type &value) { static void getArgsValue(const HttpAllArgs<ApiArgsType> &allArgs, const string &key, Type &value) {
auto val = allArgs[key]; auto val = allArgs[key];
@ -973,59 +1045,6 @@ void installWebApi() {
val["count_hit"] = (Json::UInt64)count_hit; val["count_hit"] = (Json::UInt64)count_hit;
}); });
static auto addStreamPusherProxy = [](const string &schema,
const string &vhost,
const string &app,
const string &stream,
const string &url,
int retry_count,
int rtp_type,
float timeout_sec,
const function<void(const SockException &ex, const string &key)> &cb) {
auto key = getPusherKey(schema, vhost, app, stream, url);
auto src = MediaSource::find(schema, vhost, app, stream);
if (!src) {
cb(SockException(Err_other, "can not find the source stream"), key);
return;
}
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
if (s_proxyPusherMap.find(key) != s_proxyPusherMap.end()) {
//已经在推流了
cb(SockException(Err_success), key);
return;
}
//添加推流代理
auto pusher = std::make_shared<PusherProxy>(src, retry_count);
s_proxyPusherMap[key] = pusher;
//指定RTP over TCP(播放rtsp时有效)
(*pusher)[Client::kRtpType] = rtp_type;
if (timeout_sec > 0.1) {
//推流握手超时时间
(*pusher)[Client::kTimeoutMS] = timeout_sec * 1000;
}
//开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试
pusher->setPushCallbackOnce([cb, key, url](const SockException &ex) {
if (ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
}
cb(ex, key);
});
//被主动关闭推流
pusher->setOnClose([key, url](const SockException &ex) {
WarnL << "Push " << url << " failed, key: " << key << ", err: " << ex;
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
s_proxyPusherMap.erase(key);
});
pusher->publish(url);
};
//动态添加rtsp/rtmp推流代理 //动态添加rtsp/rtmp推流代理
//测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs
api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) {
@ -1058,8 +1077,7 @@ void installWebApi() {
api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) { api_regist("/index/api/delStreamPusherProxy", [](API_ARGS_MAP) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx); val["data"]["flag"] = s_pusher_proxy.erase(allArgs["key"]) == 1;
val["data"]["flag"] = s_proxyPusherMap.erase(allArgs["key"]) == 1;
}); });
//动态添加rtsp/rtmp拉流代理 //动态添加rtsp/rtmp拉流代理
@ -1100,8 +1118,7 @@ void installWebApi() {
api_regist("/index/api/delStreamProxy",[](API_ARGS_MAP){ api_regist("/index/api/delStreamProxy",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<recursive_mutex> lck(s_proxyMapMtx); val["data"]["flag"] = s_player_proxy.erase(allArgs["key"]) == 1;
val["data"]["flag"] = s_proxyMap.erase(allArgs["key"]) == 1;
}); });
static auto addFFmpegSource = [](const string &ffmpeg_cmd_key, static auto addFFmpegSource = [](const string &ffmpeg_cmd_key,
@ -1112,25 +1129,21 @@ void installWebApi() {
bool enable_mp4, bool enable_mp4,
const function<void(const SockException &ex, const string &key)> &cb) { const function<void(const SockException &ex, const string &key)> &cb) {
auto key = MD5(dst_url).hexdigest(); auto key = MD5(dst_url).hexdigest();
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); if (s_ffmpeg_src.find(key)) {
if (s_ffmpegMap.find(key) != s_ffmpegMap.end()) {
//已经在拉流了 //已经在拉流了
cb(SockException(Err_success), key); cb(SockException(Err_success), key);
return; return;
} }
FFmpegSource::Ptr ffmpeg = std::make_shared<FFmpegSource>(); auto ffmpeg = s_ffmpeg_src.make(key);
s_ffmpegMap[key] = ffmpeg;
ffmpeg->setOnClose([key]() { ffmpeg->setOnClose([key]() {
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); s_ffmpeg_src.erase(key);
s_ffmpegMap.erase(key);
}); });
ffmpeg->setupRecordFlag(enable_hls, enable_mp4); ffmpeg->setupRecordFlag(enable_hls, enable_mp4);
ffmpeg->play(ffmpeg_cmd_key, src_url, dst_url, timeout_ms, [cb, key](const SockException &ex) { ffmpeg->play(ffmpeg_cmd_key, src_url, dst_url, timeout_ms, [cb, key](const SockException &ex) {
if (ex) { if (ex) {
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); s_ffmpeg_src.erase(key);
s_ffmpegMap.erase(key);
} }
cb(ex, key); cb(ex, key);
}); });
@ -1164,8 +1177,7 @@ void installWebApi() {
api_regist("/index/api/delFFmpegSource",[](API_ARGS_MAP){ api_regist("/index/api/delFFmpegSource",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
lock_guard<decltype(s_ffmpegMapMtx)> lck(s_ffmpegMapMtx); val["data"]["flag"] = s_ffmpeg_src.erase(allArgs["key"]) == 1;
val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1;
}); });
//新增http api下载可执行程序文件接口 //新增http api下载可执行程序文件接口
@ -1245,22 +1257,27 @@ void installWebApi() {
api_regist("/index/api/connectRtpServer", [](API_ARGS_MAP_ASYNC) { api_regist("/index/api/connectRtpServer", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id", "dst_url", "dst_port"); CHECK_ARGS("stream_id", "dst_url", "dst_port");
connectRtpServer( auto cb = [val, headerOut, invoker](const SockException &ex) mutable {
allArgs["stream_id"], allArgs["dst_url"], allArgs["dst_port"],
[val, headerOut, invoker](const SockException &ex) mutable {
if (ex) { if (ex) {
val["code"] = API::OtherFailed; val["code"] = API::OtherFailed;
val["msg"] = ex.what(); val["msg"] = ex.what();
} }
invoker(200, headerOut, val.toStyledString()); invoker(200, headerOut, val.toStyledString());
}); };
auto server = s_rtp_server.find(allArgs["stream_id"]);
if (!server) {
cb(SockException(Err_other, "未找到rtp服务"));
return;
}
server->connectToServer(allArgs["dst_url"], allArgs["dst_port"], cb);
}); });
api_regist("/index/api/closeRtpServer",[](API_ARGS_MAP){ api_regist("/index/api/closeRtpServer",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
if(!closeRtpServer(allArgs["stream_id"])){ if(s_rtp_server.erase(allArgs["stream_id"]) == 0){
val["hit"] = 0; val["hit"] = 0;
return; return;
} }
@ -1271,19 +1288,18 @@ void installWebApi() {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id", "ssrc"); CHECK_ARGS("stream_id", "ssrc");
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); auto server = s_rtp_server.find(allArgs["stream_id"]);
auto it = s_rtpServerMap.find(allArgs["stream_id"]); if (!server) {
if (it == s_rtpServerMap.end()) {
throw ApiRetException("RtpServer not found by stream_id", API::NotFound); throw ApiRetException("RtpServer not found by stream_id", API::NotFound);
} }
it->second->updateSSRC(allArgs["ssrc"]); server->updateSSRC(allArgs["ssrc"]);
}); });
api_regist("/index/api/listRtpServer",[](API_ARGS_MAP){ api_regist("/index/api/listRtpServer",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx); std::lock_guard<std::recursive_mutex> lck(s_rtp_server._mtx);
for (auto &pr : s_rtpServerMap) { for (auto &pr : s_rtp_server._map) {
Value obj; Value obj;
obj["stream_id"] = pr.first; obj["stream_id"] = pr.first;
obj["port"] = pr.second->getPort(); obj["port"] = pr.second->getPort();
@ -1299,9 +1315,10 @@ void installWebApi() {
if (!src) { if (!src) {
throw ApiRetException("can not find the source stream", API::NotFound); throw ApiRetException("can not find the source stream", API::NotFound);
} }
auto type = allArgs["type"].as<int>();
if (!allArgs["use_ps"].empty()) { if (!allArgs["use_ps"].empty()) {
// 兼容之前的use_ps参数 // 兼容之前的use_ps参数
allArgs["type"] = allArgs["use_ps"].as<int>(); type = allArgs["use_ps"].as<int>();
} }
MediaSourceEvent::SendRtpArgs args; MediaSourceEvent::SendRtpArgs args;
args.passive = false; args.passive = false;
@ -1312,11 +1329,11 @@ void installWebApi() {
args.is_udp = allArgs["is_udp"]; args.is_udp = allArgs["is_udp"];
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as<int>()); args.type = (MediaSourceEvent::SendRtpArgs::Type)type;
args.only_audio = allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"]; args.udp_rtcp_timeout = allArgs["udp_rtcp_timeout"];
args.recv_stream_id = allArgs["recv_stream_id"]; args.recv_stream_id = allArgs["recv_stream_id"];
TraceL << "startSendRtp, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio; TraceL << "startSendRtp, pt " << int(args.pt) << " rtp type " << type << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
@ -1338,10 +1355,10 @@ void installWebApi() {
if (!src) { if (!src) {
throw ApiRetException("can not find the source stream", API::NotFound); throw ApiRetException("can not find the source stream", API::NotFound);
} }
auto type = allArgs["type"].as<int>();
if (!allArgs["use_ps"].empty()) { if (!allArgs["use_ps"].empty()) {
// 兼容之前的use_ps参数 // 兼容之前的use_ps参数
allArgs["type"] = allArgs["use_ps"].as<int>(); type = allArgs["use_ps"].as<int>();
} }
MediaSourceEvent::SendRtpArgs args; MediaSourceEvent::SendRtpArgs args;
@ -1350,12 +1367,12 @@ void installWebApi() {
args.is_udp = false; args.is_udp = false;
args.src_port = allArgs["src_port"]; args.src_port = allArgs["src_port"];
args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>(); args.pt = allArgs["pt"].empty() ? 96 : allArgs["pt"].as<int>();
args.type = (MediaSourceEvent::SendRtpArgs::Type)(allArgs["type"].as<int>()); args.type = (MediaSourceEvent::SendRtpArgs::Type)type;
args.only_audio = allArgs["only_audio"].as<bool>(); args.only_audio = allArgs["only_audio"].as<bool>();
args.recv_stream_id = allArgs["recv_stream_id"]; args.recv_stream_id = allArgs["recv_stream_id"];
//tcp被动服务器等待链接超时时间 //tcp被动服务器等待链接超时时间
args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"]; args.tcp_passive_close_delay_ms = allArgs["close_delay_ms"];
TraceL << "startSendRtpPassive, pt " << int(args.pt) << " rtp type " << args.type << " audio " << args.only_audio; TraceL << "startSendRtpPassive, pt " << int(args.pt) << " rtp type " << type << " audio " << args.only_audio;
src->getOwnerPoller()->async([=]() mutable { src->getOwnerPoller()->async([=]() mutable {
src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable { src->startSendRtp(args, [val, headerOut, invoker](uint16_t local_port, const SockException &ex) mutable {
@ -1517,18 +1534,11 @@ void installWebApi() {
api_regist("/index/api/getProxyPusherInfo", [](API_ARGS_MAP_ASYNC) { api_regist("/index/api/getProxyPusherInfo", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
decltype(s_proxyPusherMap.end()) it; auto pusher = s_pusher_proxy.find(allArgs["key"]);
{ if (!pusher) {
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
it = s_proxyPusherMap.find(allArgs["key"]);
}
if (it == s_proxyPusherMap.end()) {
throw ApiRetException("can not find pusher", API::NotFound); throw ApiRetException("can not find pusher", API::NotFound);
} }
auto pusher = it->second;
val["data"]["status"] = pusher->getStatus(); val["data"]["status"] = pusher->getStatus();
val["data"]["liveSecs"] = pusher->getLiveSecs(); val["data"]["liveSecs"] = pusher->getLiveSecs();
val["data"]["rePublishCount"] = pusher->getRePublishCount(); val["data"]["rePublishCount"] = pusher->getRePublishCount();
@ -1538,18 +1548,11 @@ void installWebApi() {
api_regist("/index/api/getProxyInfo", [](API_ARGS_MAP_ASYNC) { api_regist("/index/api/getProxyInfo", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("key"); CHECK_ARGS("key");
decltype(s_proxyMap.end()) it; auto proxy = s_player_proxy.find(allArgs["key"]);
{ if (!proxy) {
lock_guard<recursive_mutex> lck(s_proxyMapMtx);
it = s_proxyMap.find(allArgs["key"]);
}
if (it == s_proxyMap.end()) {
throw ApiRetException("can not find the proxy", API::NotFound); throw ApiRetException("can not find the proxy", API::NotFound);
} }
auto proxy = it->second;
val["data"]["status"] = proxy->getStatus(); val["data"]["status"] = proxy->getStatus();
val["data"]["liveSecs"] = proxy->getLiveSecs(); val["data"]["liveSecs"] = proxy->getLiveSecs();
val["data"]["rePullCount"] = proxy->getRePullCount(); val["data"]["rePullCount"] = proxy->getRePullCount();
@ -1925,31 +1928,12 @@ void installWebApi() {
} }
void unInstallWebApi(){ void unInstallWebApi(){
{ s_player_proxy.clear();
lock_guard<recursive_mutex> lck(s_proxyMapMtx); s_ffmpeg_src.clear();
auto proxyMap(std::move(s_proxyMap)); s_pusher_proxy.clear();
proxyMap.clear();
}
{
lock_guard<recursive_mutex> lck(s_ffmpegMapMtx);
auto ffmpegMap(std::move(s_ffmpegMap));
ffmpegMap.clear();
}
{
lock_guard<recursive_mutex> lck(s_proxyPusherMapMtx);
auto proxyPusherMap(std::move(s_proxyPusherMap));
proxyPusherMap.clear();
}
{
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
RtpSelector::Instance().clear(); s_rtp_server.clear();
lock_guard<recursive_mutex> lck(s_rtpServerMapMtx);
auto rtpServerMap(std::move(s_rtpServerMap));
rtpServerMap.clear();
#endif #endif
}
NoticeCenter::Instance().delListener(&web_api_tag); NoticeCenter::Instance().delListener(&web_api_tag);
} }

View File

@ -297,6 +297,7 @@ const string kSampleMS = RECORD_FIELD "sampleMS";
const string kFileBufSize = RECORD_FIELD "fileBufSize"; const string kFileBufSize = RECORD_FIELD "fileBufSize";
const string kFastStart = RECORD_FIELD "fastStart"; const string kFastStart = RECORD_FIELD "fastStart";
const string kFileRepeat = RECORD_FIELD "fileRepeat"; const string kFileRepeat = RECORD_FIELD "fileRepeat";
const string kEnableFmp4 = RECORD_FIELD "enableFmp4";
static onceToken token([]() { static onceToken token([]() {
mINI::Instance()[kAppName] = "record"; mINI::Instance()[kAppName] = "record";
@ -304,6 +305,7 @@ static onceToken token([]() {
mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kFileBufSize] = 64 * 1024;
mINI::Instance()[kFastStart] = false; mINI::Instance()[kFastStart] = false;
mINI::Instance()[kFileRepeat] = false; mINI::Instance()[kFileRepeat] = false;
mINI::Instance()[kEnableFmp4] = false;
}); });
} // namespace Record } // namespace Record

View File

@ -354,6 +354,8 @@ extern const std::string kFileBufSize;
extern const std::string kFastStart; extern const std::string kFastStart;
// mp4文件是否重头循环读取 // mp4文件是否重头循环读取
extern const std::string kFileRepeat; extern const std::string kFileRepeat;
// mp4录制文件是否采用fmp4格式
extern const std::string kEnableFmp4;
} // namespace Record } // namespace Record
////////////HLS相关配置/////////// ////////////HLS相关配置///////////

View File

@ -72,7 +72,7 @@ void HlsMakerImp::clearCache(bool immediately, bool eof) {
std::list<std::string> lst; std::list<std::string> lst;
lst.emplace_back(_path_hls); lst.emplace_back(_path_hls);
lst.emplace_back(_path_hls_delay); lst.emplace_back(_path_hls_delay);
if (!_path_init.empty()) { if (!_path_init.empty() && eof) {
lst.emplace_back(_path_init); lst.emplace_back(_path_init);
} }
for (auto &pr : _segment_file_paths) { for (auto &pr : _segment_file_paths) {

View File

@ -31,7 +31,8 @@ void MP4Muxer::openMP4(const string &file) {
MP4FileIO::Writer MP4Muxer::createWriter() { MP4FileIO::Writer MP4Muxer::createWriter() {
GET_CONFIG(bool, mp4FastStart, Record::kFastStart); GET_CONFIG(bool, mp4FastStart, Record::kFastStart);
return _mp4_file->createWriter(mp4FastStart ? MOV_FLAG_FASTSTART : 0, false); GET_CONFIG(bool, recordEnableFmp4, Record::kEnableFmp4);
return _mp4_file->createWriter(mp4FastStart ? MOV_FLAG_FASTSTART : 0, recordEnableFmp4);
} }
void MP4Muxer::closeMP4() { void MP4Muxer::closeMP4() {

View File

@ -31,7 +31,6 @@
#define RTP_CNAME "zlmediakit-rtp" #define RTP_CNAME "zlmediakit-rtp"
#define RTP_LABEL "zlmediakit-label" #define RTP_LABEL "zlmediakit-label"
#define RTP_MSLABEL "zlmediakit-mslabel" #define RTP_MSLABEL "zlmediakit-mslabel"
#define RTP_MSID RTP_MSLABEL " " RTP_LABEL
using namespace std; using namespace std;
@ -707,9 +706,9 @@ void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
// 发送的ssrc我们随便定义因为在发送rtp时会修改为此值 // 发送的ssrc我们随便定义因为在发送rtp时会修改为此值
ssrc.ssrc = m.type + RTP_SSRC_OFFSET; ssrc.ssrc = m.type + RTP_SSRC_OFFSET;
ssrc.cname = RTP_CNAME; ssrc.cname = RTP_CNAME;
ssrc.label = RTP_LABEL; ssrc.label = std::string(RTP_LABEL) + '-' + m.mid;
ssrc.mslabel = RTP_MSLABEL; ssrc.mslabel = RTP_MSLABEL;
ssrc.msid = RTP_MSID; ssrc.msid = ssrc.mslabel + ' ' + ssrc.label;
if (m.getRelatedRtxPlan(m.plan[0].pt)) { if (m.getRelatedRtxPlan(m.plan[0].pt)) {
// rtx ssrc // rtx ssrc