This commit is contained in:
baiyfcu 2024-04-08 10:16:55 +08:00 committed by GitHub
commit fc7ef34d8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 208 additions and 64 deletions

View File

@ -474,6 +474,7 @@ endif()
############################################################################## ##############################################################################
# for version.h # for version.h
update_cached_list(ZLM_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_BINARY_DIR})
# for assert.h # for assert.h

View File

@ -20,10 +20,11 @@ typedef struct mk_rtp_server_t *mk_rtp_server;
* GB28181 RTP * GB28181 RTP
* @param port 0 * @param port 0
* @param tcp_mode tcp模式(0: 1: 2: ) * @param tcp_mode tcp模式(0: 1: 2: )
* @param app_name app名称""使 rtp
* @param stream_id id * @param stream_id id
* @return * @return
*/ */
API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id); API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *app_name, const char *stream_id);
/** /**
* TCP * TCP

View File

@ -16,9 +16,9 @@ using namespace toolkit;
#include "Rtp/RtpServer.h" #include "Rtp/RtpServer.h"
using namespace mediakit; using namespace mediakit;
API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id) { API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *app_name, const char *stream_id) {
RtpServer::Ptr *server = new RtpServer::Ptr(new RtpServer); RtpServer::Ptr *server = new RtpServer::Ptr(new RtpServer);
(*server)->start(port, stream_id, (RtpServer::TcpMode)tcp_mode); (*server)->start(port, app_name, stream_id, (RtpServer::TcpMode)tcp_mode);
return (mk_rtp_server)server; return (mk_rtp_server)server;
} }

View File

@ -469,14 +469,14 @@ Value makeMediaSourceJson(MediaSource &media){
} }
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { uint16_t openRtpServer(uint16_t local_port, const string &app_name, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
if (s_rtp_server.find(stream_id)) { if (s_rtp_server.find(stream_id)) {
//为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id //为了防止RtpProcess所有权限混乱的问题不允许重复添加相同的stream_id
return 0; return 0;
} }
auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) { auto server = s_rtp_server.makeWithAction(stream_id, [&](RtpServer::Ptr server) {
server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex); server->start(local_port, app_name, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc, only_track, multiplex);
}); });
server->setOnDetach([stream_id]() { server->setOnDetach([stream_id]() {
//设置rtp超时移除事件 //设置rtp超时移除事件
@ -1192,7 +1192,7 @@ void installWebApi() {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto process = RtpSelector::Instance().getProcess(allArgs["app_name"], allArgs["stream_id"], false);
if (!process) { if (!process) {
val["exist"] = false; val["exist"] = false;
return; return;
@ -1204,6 +1204,7 @@ void installWebApi() {
api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){ api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("port", "stream_id"); CHECK_ARGS("port", "stream_id");
auto app_name = allArgs["app_name"]; // 默认rtp
auto stream_id = allArgs["stream_id"]; auto stream_id = allArgs["stream_id"];
auto tcp_mode = allArgs["tcp_mode"].as<int>(); auto tcp_mode = allArgs["tcp_mode"].as<int>();
if (allArgs["enable_tcp"].as<int>() && !tcp_mode) { if (allArgs["enable_tcp"].as<int>() && !tcp_mode) {
@ -1219,7 +1220,8 @@ void installWebApi() {
if (!allArgs["local_ip"].empty()) { if (!allArgs["local_ip"].empty()) {
local_ip = allArgs["local_ip"]; local_ip = allArgs["local_ip"];
} }
auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as<bool>(), auto port = openRtpServer(
allArgs["port"], app_name, stream_id, tcp_mode, local_ip, allArgs["re_use_port"].as<bool>(),
allArgs["ssrc"].as<uint32_t>(), only_track); allArgs["ssrc"].as<uint32_t>(), only_track);
if (port == 0) { if (port == 0) {
throw InvalidArgsException("该stream_id已存在"); throw InvalidArgsException("该stream_id已存在");
@ -1231,6 +1233,7 @@ void installWebApi() {
api_regist("/index/api/openRtpServerMultiplex", [](API_ARGS_MAP) { api_regist("/index/api/openRtpServerMultiplex", [](API_ARGS_MAP) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("port", "stream_id"); CHECK_ARGS("port", "stream_id");
auto app_name = allArgs["app_name"];
auto stream_id = allArgs["stream_id"]; auto stream_id = allArgs["stream_id"];
auto tcp_mode = allArgs["tcp_mode"].as<int>(); auto tcp_mode = allArgs["tcp_mode"].as<int>();
if (allArgs["enable_tcp"].as<int>() && !tcp_mode) { if (allArgs["enable_tcp"].as<int>() && !tcp_mode) {
@ -1246,7 +1249,7 @@ void installWebApi() {
if (!allArgs["local_ip"].empty()) { if (!allArgs["local_ip"].empty()) {
local_ip = allArgs["local_ip"]; local_ip = allArgs["local_ip"];
} }
auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, local_ip, true, 0, only_track,true); auto port = openRtpServer(allArgs["port"], app_name, stream_id, tcp_mode, local_ip, true, 0, only_track, true);
if (port == 0) { if (port == 0) {
throw InvalidArgsException("该stream_id已存在"); throw InvalidArgsException("该stream_id已存在");
} }
@ -1303,6 +1306,7 @@ void installWebApi() {
Value obj; Value obj;
obj["stream_id"] = pr.first; obj["stream_id"] = pr.first;
obj["port"] = pr.second->getPort(); obj["port"] = pr.second->getPort();
obj["app_name"] = pr.second->getAppName();
val["data"].append(obj); val["data"].append(obj);
} }
}); });
@ -1411,7 +1415,7 @@ void installWebApi() {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
//只是暂停流的检查流媒体服务器做为流负载服务收流就转发RTSP/RTMP有自己暂停协议 //只是暂停流的检查流媒体服务器做为流负载服务收流就转发RTSP/RTMP有自己暂停协议
auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto rtp_process = RtpSelector::Instance().getProcess(allArgs["app"], allArgs["stream_id"], false);
if (rtp_process) { if (rtp_process) {
rtp_process->setStopCheckRtp(true); rtp_process->setStopCheckRtp(true);
} else { } else {
@ -1422,7 +1426,7 @@ void installWebApi() {
api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) { api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) {
CHECK_SECRET(); CHECK_SECRET();
CHECK_ARGS("stream_id"); CHECK_ARGS("stream_id");
auto rtp_process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); auto rtp_process = RtpSelector::Instance().getProcess(allArgs["app"], allArgs["stream_id"], false);
if (rtp_process) { if (rtp_process) {
rtp_process->setStopCheckRtp(false); rtp_process->setStopCheckRtp(false);
} else { } else {

View File

@ -202,7 +202,9 @@ void installWebApi();
void unInstallWebApi(); void unInstallWebApi();
#if defined(ENABLE_RTPPROXY) #if defined(ENABLE_RTPPROXY)
uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex=false); uint16_t openRtpServer(uint16_t local_port, const std::string &app_name, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex=false);
//void connectRtpServer(const std::string &stream_id, const std::string &dst_url, uint16_t dst_port, const std::function<void(const toolkit::SockException &ex)> &cb);
//bool closeRtpServer(const std::string &stream_id);
#endif #endif
Json::Value makeMediaSourceJson(mediakit::MediaSource &media); Json::Value makeMediaSourceJson(mediakit::MediaSource &media);

View File

@ -302,6 +302,8 @@ void RtspUrl::setup(bool is_ssl, const string &url, const string &user, const st
} }
static void inline checkHost(std::string &host) { static void inline checkHost(std::string &host) {
if (host.size() == 0)
return;
if (host.back() == ']' && host.front() == '[') { if (host.back() == ']' && host.front() == '[') {
// ipv6去除方括号 // ipv6去除方括号
host.pop_back(); host.pop_back();

View File

@ -262,6 +262,7 @@ const string kAudioMtuSize = RTP_FIELD "audioMtuSize";
const string kRtpMaxSize = RTP_FIELD "rtpMaxSize"; const string kRtpMaxSize = RTP_FIELD "rtpMaxSize";
const string kLowLatency = RTP_FIELD "lowLatency"; const string kLowLatency = RTP_FIELD "lowLatency";
const string kH264StapA = RTP_FIELD "h264_stap_a"; const string kH264StapA = RTP_FIELD "h264_stap_a";
const string kRtpAppName = RTP_FIELD "rtpAppName";
static onceToken token([]() { static onceToken token([]() {
mINI::Instance()[kVideoMtuSize] = 1400; mINI::Instance()[kVideoMtuSize] = 1400;
@ -269,6 +270,7 @@ static onceToken token([]() {
mINI::Instance()[kRtpMaxSize] = 10; mINI::Instance()[kRtpMaxSize] = 10;
mINI::Instance()[kLowLatency] = 0; mINI::Instance()[kLowLatency] = 0;
mINI::Instance()[kH264StapA] = 1; mINI::Instance()[kH264StapA] = 1;
mINI::Instance()[kRtpAppName] = "rtp";
}); });
} // namespace Rtp } // namespace Rtp

View File

@ -330,6 +330,8 @@ extern const std::string kRtpMaxSize;
extern const std::string kLowLatency; extern const std::string kLowLatency;
//H264 rtp打包模式是否采用stap-a模式(为了在老版本浏览器上兼容webrtc)还是采用Single NAL unit packet per H.264 模式 //H264 rtp打包模式是否采用stap-a模式(为了在老版本浏览器上兼容webrtc)还是采用Single NAL unit packet per H.264 模式
extern const std::string kH264StapA; extern const std::string kH264StapA;
// rtp server app名称默认 rtp
extern const std::string kRtpAppName;
} // namespace Rtp } // namespace Rtp
////////////组播配置/////////// ////////////组播配置///////////

View File

@ -153,23 +153,95 @@ void Gb2312ToUnicode(wchar_t* pOut, const char *gbBuffer)
{ {
MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, gbBuffer, 2, pOut, 1); MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, gbBuffer, 2, pOut, 1);
} }
#else
#include <iconv.h>
// 将 GB2312 编码的字符串转换为 UTF-8 编码
char *gb2312_to_utf8(const char *gb2312_string) {
size_t in_len = strlen(gb2312_string);
size_t out_len = in_len * 4; // UTF-8 最多需要 4 倍空间
iconv_t cd = iconv_open("UTF-8", "GBK");//GBK 是在 GB2312 的基础上进行扩展的字符集,包含了 GB2312 中的所有字符
if (cd == (iconv_t)-1) {
perror("iconv_open");
return NULL;
}
char *inbuf = (char *)gb2312_string;
char *outbuf = (char *)malloc(out_len + 1); // 分配足够的空间来存储转换后的字符串
if (outbuf == NULL) {
perror("malloc");
iconv_close(cd);
return NULL;
}
memset(outbuf, 0, out_len + 1);
char *inptr = inbuf;
char *outptr = outbuf;
if (iconv(cd, &inptr, &in_len, &outptr, &out_len) == (size_t)-1) {
perror("iconv");
free(outbuf);
iconv_close(cd);
return NULL;
}
iconv_close(cd);
return outbuf;
}
// 跨平台的 UTF-8 转 GB2312 编码
char *utf8_to_gb2312(const char *utf8_string) {
char *result = NULL;
// 非 Windows 平台使用 iconv 函数进行编码转换
size_t in_len = strlen(utf8_string);
size_t out_len = in_len * 4; // GB2312 最多需要 4 倍空间
iconv_t cd = iconv_open("GBK", "UTF-8");
if (cd == (iconv_t)-1) {
perror("iconv_open");
return NULL;
}
char *inbuf = (char *)utf8_string;
char *outbuf = (char *)malloc(out_len + 1); // 分配足够的空间来存储转换后的字符串
if (outbuf == NULL) {
perror("malloc");
iconv_close(cd);
return NULL;
}
memset(outbuf, 0, out_len + 1);
char *inptr = inbuf;
char *outptr = outbuf;
if (iconv(cd, &inptr, &in_len, &outptr, &out_len) == (size_t)-1) {
perror("iconv");
free(outbuf);
iconv_close(cd);
return NULL;
}
iconv_close(cd);
result = outbuf;
return result;
}
#endif//defined(_WIN32)
string strCoding::UTF8ToGB2312(const string &str) { string strCoding::UTF8ToGB2312(const string &str) {
#ifdef WIN32
auto len = str.size(); auto len = str.size();
auto pText = str.data(); auto pText = str.data();
char Ctemp[4] = {0}; char Ctemp[4] = { 0 };
char *pOut = new char[len + 1]; char *pOut = new char[len + 1];
memset(pOut, 0, len + 1); memset(pOut, 0, len + 1);
int i = 0, j = 0; int i = 0, j = 0;
while (i < len) while (i < len) {
{ if (pText[i] >= 0) {
if (pText[i] >= 0)
{
pOut[j++] = pText[i++]; pOut[j++] = pText[i++];
} } else {
else
{
wchar_t Wtemp; wchar_t Wtemp;
UTF8ToUnicode(&Wtemp, pText + i); UTF8ToUnicode(&Wtemp, pText + i);
UnicodeToGB2312(Ctemp, Wtemp); UnicodeToGB2312(Ctemp, Wtemp);
@ -182,25 +254,31 @@ string strCoding::UTF8ToGB2312(const string &str) {
string ret = pOut; string ret = pOut;
delete[] pOut; delete[] pOut;
return ret; return ret;
#else
char *gb2312_string = utf8_to_gb2312(str.c_str());
if (gb2312_string == NULL) {
return "";
}
string result(gb2312_string);
free(gb2312_string);
return result;
#endif
} }
string strCoding::GB2312ToUTF8(const string &str) { string strCoding::GB2312ToUTF8(const string &str) {
#ifdef WIN32
auto len = str.size(); auto len = str.size();
auto pText = str.data(); auto pText = str.data();
char buf[4] = { 0 }; char buf[4] = { 0 };
auto nLength = len * 3; auto nLength = len * 3;
char* pOut = new char[nLength]; char *pOut = new char[nLength];
memset(pOut, 0, nLength); memset(pOut, 0, nLength);
size_t i = 0, j = 0; size_t i = 0, j = 0;
while (i < len) while (i < len) {
{ // 如果是英文直接复制就可以
//如果是英文直接复制就可以 if (*(pText + i) >= 0) {
if (*(pText + i) >= 0)
{
pOut[j++] = pText[i++]; pOut[j++] = pText[i++];
} } else {
else
{
wchar_t pbuffer; wchar_t pbuffer;
Gb2312ToUnicode(&pbuffer, pText + i); Gb2312ToUnicode(&pbuffer, pText + i);
UnicodeToUTF8(buf, &pbuffer); UnicodeToUTF8(buf, &pbuffer);
@ -214,7 +292,15 @@ string strCoding::GB2312ToUTF8(const string &str) {
string ret = pOut; string ret = pOut;
delete[] pOut; delete[] pOut;
return ret; return ret;
#else
char *utf8_string = gb2312_to_utf8(str.c_str());
if (utf8_string == NULL) {
return "";
}
string result(utf8_string);
free(utf8_string);
return result;
#endif
} }
#endif//defined(_WIN32)
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -22,10 +22,10 @@ public:
static std::string UrlEncodeComponent(const std::string &str); // url参数 utf8编码 static std::string UrlEncodeComponent(const std::string &str); // url参数 utf8编码
static std::string UrlDecodePath(const std::string &str); //url路径 utf8解码 static std::string UrlDecodePath(const std::string &str); //url路径 utf8解码
static std::string UrlDecodeComponent(const std::string &str); // url参数 utf8解码 static std::string UrlDecodeComponent(const std::string &str); // url参数 utf8解码
#if defined(_WIN32)
static std::string UTF8ToGB2312(const std::string &str);//utf_8转为gb2312 static std::string UTF8ToGB2312(const std::string &str);//utf_8转为gb2312
static std::string GB2312ToUTF8(const std::string &str); //gb2312 转utf_8 static std::string GB2312ToUTF8(const std::string &str); //gb2312 转utf_8
#endif//defined(_WIN32)
private: private:
strCoding(void); strCoding(void);
virtual ~strCoding(void); virtual ~strCoding(void);

View File

@ -203,7 +203,8 @@ protected:
} }
//分片缓存太大,需要清空 //分片缓存太大,需要清空
} }
if (!_session)
break;
//最后一个包 //最后一个包
if (_payload_cache.empty()) { if (_payload_cache.empty()) {
//这个包是唯一个分片 //这个包是唯一个分片

View File

@ -63,6 +63,8 @@ void MP4Demuxer::onVideoTrack(uint32_t track, uint8_t object, int width, int hei
} }
video->setIndex(track); video->setIndex(track);
_tracks.emplace(track, video); _tracks.emplace(track, video);
if (_on_track_callback)
_on_track_callback(video);
if (extra && bytes) { if (extra && bytes) {
video->setExtraData((uint8_t *)extra, bytes); video->setExtraData((uint8_t *)extra, bytes);
} }
@ -75,6 +77,8 @@ void MP4Demuxer::onAudioTrack(uint32_t track, uint8_t object, int channel_count,
} }
audio->setIndex(track); audio->setIndex(track);
_tracks.emplace(track, audio); _tracks.emplace(track, audio);
if (_on_track_callback)
_on_track_callback(audio);
if (extra && bytes) { if (extra && bytes) {
audio->setExtraData((uint8_t *)extra, bytes); audio->setExtraData((uint8_t *)extra, bytes);
} }
@ -100,7 +104,10 @@ struct Context {
Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) { Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) {
keyFrame = false; keyFrame = false;
eof = false; eof = false;
if (!_mov_reader) {
eof = true;
return nullptr;
}
static mov_reader_onread2 mov_onalloc = [](void *param, uint32_t track_id, size_t bytes, int64_t pts, int64_t dts, int flags) -> void * { static mov_reader_onread2 mov_onalloc = [](void *param, uint32_t track_id, size_t bytes, int64_t pts, int64_t dts, int flags) -> void * {
Context *ctx = (Context *) param; Context *ctx = (Context *) param;
ctx->pts = pts; ctx->pts = pts;

View File

@ -20,25 +20,25 @@ class MP4Demuxer : public TrackSource {
public: public:
using Ptr = std::shared_ptr<MP4Demuxer>; using Ptr = std::shared_ptr<MP4Demuxer>;
~MP4Demuxer() override; virtual ~MP4Demuxer() override;
/** /**
* *
* @param file mp4文件路径 * @param file mp4文件路径
*/ */
void openMP4(const std::string &file); virtual void openMP4(const std::string &file);
/** /**
* @brief mp4 * @brief mp4
*/ */
void closeMP4(); virtual void closeMP4();
/** /**
* *
* @param stamp_ms * @param stamp_ms
* @return * @return
*/ */
int64_t seekTo(int64_t stamp_ms); virtual int64_t seekTo(int64_t stamp_ms);
/** /**
* *
@ -46,20 +46,22 @@ public:
* @param eof * @param eof
* @return , * @return ,
*/ */
Frame::Ptr readFrame(bool &keyFrame, bool &eof); virtual Frame::Ptr readFrame(bool &keyFrame, bool &eof);
/** /**
* Track信息 * Track信息
* @param trackReady track为就绪状态 * @param trackReady track为就绪状态
* @return Track * @return Track
*/ */
std::vector<Track::Ptr> getTracks(bool trackReady) const override; virtual std::vector<Track::Ptr> getTracks(bool trackReady) const override;
/** /**
* *
* @return * @return
*/ */
uint64_t getDurationMS() const; virtual uint64_t getDurationMS() const;
virtual void setOnTrack(const std::function<void(Track::Ptr &track)> &callback) { _on_track_callback = callback; }
private: private:
int getAllTracks(); int getAllTracks();
@ -73,6 +75,7 @@ private:
uint64_t _duration_ms = 0; uint64_t _duration_ms = 0;
std::unordered_map<int, Track::Ptr> _tracks; std::unordered_map<int, Track::Ptr> _tracks;
toolkit::ResourcePool<toolkit::BufferRaw> _buffer_pool; toolkit::ResourcePool<toolkit::BufferRaw> _buffer_pool;
std::function<void(Track::Ptr &track)> _on_track_callback;
}; };

View File

@ -19,17 +19,18 @@
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
static constexpr char kRtpAppName[] = "rtp"; static constexpr char kRtpSchemaName[] = "rtp";
//在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验 //在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验
//但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回 //但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回
static constexpr size_t kMaxCachedFrame = 200; static constexpr size_t kMaxCachedFrame = 200;
namespace mediakit { namespace mediakit {
RtpProcess::RtpProcess(const string &stream_id) { RtpProcess::RtpProcess(const string &app_name, const string &stream_id) {
_media_info.schema = kRtpAppName; GET_CONFIG(string, kRtpAppName, Rtp::kRtpAppName);
_media_info.schema = kRtpSchemaName;
_media_info.vhost = DEFAULT_VHOST; _media_info.vhost = DEFAULT_VHOST;
_media_info.app = kRtpAppName; _media_info.app = app_name.length() > 0 ? app_name : kRtpAppName;
_media_info.stream = stream_id; _media_info.stream = stream_id;
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);

View File

@ -22,7 +22,7 @@ class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, pu
public: public:
using Ptr = std::shared_ptr<RtpProcess>; using Ptr = std::shared_ptr<RtpProcess>;
friend class RtpProcessHelper; friend class RtpProcessHelper;
RtpProcess(const std::string &stream_id); RtpProcess(const std::string &app_name, const std::string &stream_id);
~RtpProcess(); ~RtpProcess();
enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 }; enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 };

View File

@ -35,8 +35,9 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
return true; return true;
} }
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { RtpProcess::Ptr RtpSelector::getProcess(const string &app_name, const string &stream_id,bool makeNew) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map); lock_guard<decltype(_mtx_map)> lck(_mtx_map);
string app_name_origin = app_name;
string stream_id_origin = stream_id; string stream_id_origin = stream_id;
auto it_replace = _map_stream_replace.find(stream_id); auto it_replace = _map_stream_replace.find(stream_id);
if (it_replace != _map_stream_replace.end()) { if (it_replace != _map_stream_replace.end()) {
@ -53,7 +54,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
} }
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin]; RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin];
if (!ref) { if (!ref) {
ref = std::make_shared<RtpProcessHelper>(stream_id_origin, shared_from_this()); ref = std::make_shared<RtpProcessHelper>(app_name_origin, stream_id_origin, shared_from_this());
ref->attachEvent(); ref->attachEvent();
createTimer(); createTimer();
} }
@ -128,10 +129,11 @@ void RtpSelector::onManager() {
}); });
} }
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) { RtpProcessHelper::RtpProcessHelper(const string &app_name, const string &stream_id, const weak_ptr<RtpSelector> &parent) {
_app_name = app_name;
_stream_id = stream_id; _stream_id = stream_id;
_parent = parent; _parent = parent;
_process = std::make_shared<RtpProcess>(stream_id); _process = std::make_shared<RtpProcess>(app_name, stream_id);
} }
RtpProcessHelper::~RtpProcessHelper() { RtpProcessHelper::~RtpProcessHelper() {

View File

@ -24,7 +24,7 @@ class RtpSelector;
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> { class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
public: public:
using Ptr = std::shared_ptr<RtpProcessHelper>; using Ptr = std::shared_ptr<RtpProcessHelper>;
RtpProcessHelper(const std::string &stream_id, const std::weak_ptr<RtpSelector > &parent); RtpProcessHelper(const std::string &app_name, const std::string &stream_id, const std::weak_ptr<RtpSelector> &parent);
~RtpProcessHelper(); ~RtpProcessHelper();
void attachEvent(); void attachEvent();
RtpProcess::Ptr & getProcess(); RtpProcess::Ptr & getProcess();
@ -34,6 +34,7 @@ protected:
bool close(MediaSource &sender) override; bool close(MediaSource &sender) override;
private: private:
std::string _app_name;
std::string _stream_id; std::string _stream_id;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
std::weak_ptr<RtpSelector> _parent; std::weak_ptr<RtpSelector> _parent;
@ -57,11 +58,12 @@ public:
/** /**
* rtp处理器 * rtp处理器
* @param app_name app名称 ""使 rtp
* @param stream_id id * @param stream_id id
* @param makeNew , true时 * @param makeNew , true时
* @return rtp处理器 * @return rtp处理器
*/ */
RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew); RtpProcess::Ptr getProcess(const std::string &app_name, const std::string &stream_id, bool makeNew);
/** /**
* rtp处理器 * rtp处理器

View File

@ -30,8 +30,9 @@ class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
public: public:
using Ptr = std::shared_ptr<RtcpHelper>; using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) { RtcpHelper(Socket::Ptr rtcp_sock, std::string app_name, std::string stream_id) {
_rtcp_sock = std::move(rtcp_sock); _rtcp_sock = std::move(rtcp_sock);
_app_name = app_name;
_stream_id = std::move(stream_id); _stream_id = std::move(stream_id);
} }
@ -60,7 +61,7 @@ public:
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) { void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) { if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true);
_process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track); _process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setOnDetach(std::move(_on_detach)); _process->setOnDetach(std::move(_on_detach));
cancelDelayTask(); cancelDelayTask();
@ -96,7 +97,7 @@ public:
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec);
_delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() {
if (auto strong_self = weak_self.lock()) { if (auto strong_self = weak_self.lock()) {
auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); auto process = RtpSelector::Instance().getProcess(strong_self->_app_name, strong_self->_stream_id, false);
if (!process && strong_self->_on_detach) { if (!process && strong_self->_on_detach) {
strong_self->_on_detach(); strong_self->_on_detach();
} }
@ -119,6 +120,10 @@ public:
} }
} }
string getAppName() { return _app_name; }
string getStreamId() { return _stream_id; }
private: private:
void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) { void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) {
// 每5秒发送一次rtcp // 每5秒发送一次rtcp
@ -150,13 +155,14 @@ private:
Ticker _ticker; Ticker _ticker;
Socket::Ptr _rtcp_sock; Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;
std::string _app_name;
std::string _stream_id; std::string _stream_id;
function<void()> _on_detach; function<void()> _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr; std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
EventPoller::DelayTask::Ptr _delay_task; EventPoller::DelayTask::Ptr _delay_task;
}; };
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) { void RtpServer::start(uint16_t local_port, const string &app_name, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
//创建udp服务器 //创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
@ -199,7 +205,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
//增加了多路复用判断如果多路复用为true就走else逻辑同时保留了原来stream_id为空走else逻辑 //增加了多路复用判断如果多路复用为true就走else逻辑同时保留了原来stream_id为空走else逻辑
if (!stream_id.empty() && !multiplex) { if (!stream_id.empty() && !multiplex) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流) //指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id); helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), app_name, stream_id);
helper->startRtcp(); helper->startRtcp();
helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track); helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track);
bool bind_peer_addr = false; bool bind_peer_addr = false;
@ -252,6 +258,14 @@ uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port(); return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port();
} }
std::string RtpServer::getAppName() {
return _rtcp_helper ? _rtcp_helper->getAppName() : "";
}
std::string RtpServer::getStreamId() {
return _rtcp_helper ? _rtcp_helper->getStreamId() : "";
}
void RtpServer::connectToServer(const std::string &url, uint16_t port, const function<void(const SockException &ex)> &cb) { void RtpServer::connectToServer(const std::string &url, uint16_t port, const function<void(const SockException &ex)> &cb) {
if (_tcp_mode != ACTIVE || !_rtp_socket) { if (_tcp_mode != ACTIVE || !_rtp_socket) {
cb(SockException(Err_other, "仅支持tcp主动模式")); cb(SockException(Err_other, "仅支持tcp主动模式"));

View File

@ -43,7 +43,8 @@ public:
* @param ssrc ssrc * @param ssrc ssrc
* @param multiplex * @param multiplex
*/ */
void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, void start(
uint16_t local_port, const std::string &app_name = "", const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false); const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false);
/** /**
@ -59,6 +60,16 @@ public:
*/ */
uint16_t getPort(); uint16_t getPort();
/**
* app名称
*/
std::string getAppName();
/**
* stream id
*/
std::string getStreamId();
/** /**
* RtpProcess onDetach事件回调 * RtpProcess onDetach事件回调
*/ */

View File

@ -21,6 +21,7 @@ using namespace toolkit;
namespace mediakit{ namespace mediakit{
const string RtpSession::kAppName = "app_name";
const string RtpSession::kStreamID = "stream_id"; const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kSSRC = "ssrc"; const string RtpSession::kSSRC = "ssrc";
const string RtpSession::kOnlyTrack = "only_track"; const string RtpSession::kOnlyTrack = "only_track";
@ -31,6 +32,7 @@ void RtpSession::attachServer(const Server &server) {
} }
void RtpSession::setParams(mINI &ini) { void RtpSession::setParams(mINI &ini) {
_app_name = ini[kAppName];
_stream_id = ini[kStreamID]; _stream_id = ini[kStreamID];
_ssrc = ini[kSSRC]; _ssrc = ini[kSSRC];
_only_track = ini[kOnlyTrack]; _only_track = ini[kOnlyTrack];
@ -114,7 +116,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_stream_id = printSSRC(_ssrc); _stream_id = printSSRC(_ssrc);
} }
try { try {
_process = RtpSelector::Instance().getProcess(_stream_id, true); _process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true);
} catch (RtpSelector::ProcessExisted &ex) { } catch (RtpSelector::ProcessExisted &ex) {
if (!_is_udp) { if (!_is_udp) {
// tcp情况下立即断开连接 // tcp情况下立即断开连接

View File

@ -22,6 +22,7 @@ namespace mediakit{
class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent { class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent {
public: public:
static const std::string kAppName;
static const std::string kStreamID; static const std::string kStreamID;
static const std::string kSSRC; static const std::string kSSRC;
static const std::string kOnlyTrack; static const std::string kOnlyTrack;
@ -55,6 +56,7 @@ private:
int _only_track = 0; int _only_track = 0;
uint32_t _ssrc = 0; uint32_t _ssrc = 0;
toolkit::Ticker _ticker; toolkit::Ticker _ticker;
std::string _app_name;
std::string _stream_id; std::string _stream_id;
struct sockaddr_storage _addr; struct sockaddr_storage _addr;
RtpProcess::Ptr _process; RtpProcess::Ptr _process;

View File

@ -426,9 +426,8 @@ void RtspPlayer::sendPause(int type, uint32_t seekMS) {
// 开启或暂停rtsp // 开启或暂停rtsp
switch (type) { switch (type) {
case type_pause: sendRtspRequest("PAUSE", _control_url, {}); break; case type_pause: sendRtspRequest("PAUSE", _control_url, {}); break;
case type_play: case type_play: sendRtspRequest("PLAY", _control_url);
// sendRtspRequest("PLAY", _content_base); break;
// break;
case type_seek: case type_seek:
sendRtspRequest("PLAY", _control_url, { "Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-" }); sendRtspRequest("PLAY", _control_url, { "Range", StrPrinter << "npt=" << setiosflags(ios::fixed) << setprecision(2) << seekMS / 1000.0 << "-" });
break; break;
@ -440,7 +439,7 @@ void RtspPlayer::sendPause(int type, uint32_t seekMS) {
} }
void RtspPlayer::pause(bool bPause) { void RtspPlayer::pause(bool bPause) {
sendPause(bPause ? type_pause : type_seek, getProgressMilliSecond()); sendPause(bPause ? type_pause : type_play, getProgressMilliSecond());
} }
void RtspPlayer::speed(float speed) { void RtspPlayer::speed(float speed) {

View File

@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) {
memset(&addr, 0, sizeof(addr)); memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET; addr.ss_family = AF_INET;
auto sock = Socket::createSocket(poller); auto sock = Socket::createSocket(poller);
auto process = RtpSelector::Instance().getProcess("test", true); auto process = RtpSelector::Instance().getProcess("", "test", true);
uint64_t stamp_last = 0; uint64_t stamp_last = 0;
auto total_size = std::make_shared<size_t>(0); auto total_size = std::make_shared<size_t>(0);