添加rtp server支持自定义app名称

This commit is contained in:
baiyfcu 2024-04-07 13:59:57 +08:00
parent e7eae11891
commit d771530316
10 changed files with 55 additions and 19 deletions

View File

@ -330,6 +330,8 @@ extern const std::string kRtpMaxSize;
extern const std::string kLowLatency;
//H264 rtp打包模式是否采用stap-a模式(为了在老版本浏览器上兼容webrtc)还是采用Single NAL unit packet per H.264 模式
extern const std::string kH264StapA;
// rtp server app名称默认 rtp
extern const std::string kRtpAppName;
} // namespace Rtp
////////////组播配置///////////

View File

@ -19,17 +19,18 @@
using namespace std;
using namespace toolkit;
static constexpr char kRtpAppName[] = "rtp";
static constexpr char kRtpSchemaName[] = "rtp";
//在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验
//但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回
static constexpr size_t kMaxCachedFrame = 200;
namespace mediakit {
RtpProcess::RtpProcess(const string &stream_id) {
_media_info.schema = kRtpAppName;
RtpProcess::RtpProcess(const string &app_name, const string &stream_id) {
GET_CONFIG(string, kRtpAppName, Rtp::kRtpAppName);
_media_info.schema = kRtpSchemaName;
_media_info.vhost = DEFAULT_VHOST;
_media_info.app = kRtpAppName;
_media_info.app = app_name.length() > 0 ? app_name : kRtpAppName;
_media_info.stream = stream_id;
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);

View File

@ -22,7 +22,7 @@ class RtpProcess final : public RtcpContextForRecv, public toolkit::SockInfo, pu
public:
using Ptr = std::shared_ptr<RtpProcess>;
friend class RtpProcessHelper;
RtpProcess(const std::string &stream_id);
RtpProcess(const std::string &app_name, const std::string &stream_id);
~RtpProcess();
enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 };

View File

@ -35,8 +35,9 @@ bool RtpSelector::getSSRC(const char *data, size_t data_len, uint32_t &ssrc){
return true;
}
RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
RtpProcess::Ptr RtpSelector::getProcess(const string &app_name, const string &stream_id,bool makeNew) {
lock_guard<decltype(_mtx_map)> lck(_mtx_map);
string app_name_origin = app_name;
string stream_id_origin = stream_id;
auto it_replace = _map_stream_replace.find(stream_id);
if (it_replace != _map_stream_replace.end()) {
@ -53,7 +54,7 @@ RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) {
}
RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id_origin];
if (!ref) {
ref = std::make_shared<RtpProcessHelper>(stream_id_origin, shared_from_this());
ref = std::make_shared<RtpProcessHelper>(app_name_origin, stream_id_origin, shared_from_this());
ref->attachEvent();
createTimer();
}
@ -128,10 +129,11 @@ void RtpSelector::onManager() {
});
}
RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr<RtpSelector> &parent) {
RtpProcessHelper::RtpProcessHelper(const string &app_name, const string &stream_id, const weak_ptr<RtpSelector> &parent) {
_app_name = app_name;
_stream_id = stream_id;
_parent = parent;
_process = std::make_shared<RtpProcess>(stream_id);
_process = std::make_shared<RtpProcess>(app_name, stream_id);
}
RtpProcessHelper::~RtpProcessHelper() {

View File

@ -24,7 +24,7 @@ class RtpSelector;
class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this<RtpProcessHelper> {
public:
using Ptr = std::shared_ptr<RtpProcessHelper>;
RtpProcessHelper(const std::string &stream_id, const std::weak_ptr<RtpSelector > &parent);
RtpProcessHelper(const std::string &app_name, const std::string &stream_id, const std::weak_ptr<RtpSelector> &parent);
~RtpProcessHelper();
void attachEvent();
RtpProcess::Ptr & getProcess();
@ -34,6 +34,7 @@ protected:
bool close(MediaSource &sender) override;
private:
std::string _app_name;
std::string _stream_id;
RtpProcess::Ptr _process;
std::weak_ptr<RtpSelector> _parent;
@ -57,11 +58,12 @@ public:
/**
* rtp处理器
* @param app_name app名称 ""使 rtp
* @param stream_id id
* @param makeNew , true时
* @return rtp处理器
*/
RtpProcess::Ptr getProcess(const std::string &stream_id, bool makeNew);
RtpProcess::Ptr getProcess(const std::string &app_name, const std::string &stream_id, bool makeNew);
/**
* rtp处理器

View File

@ -30,8 +30,9 @@ class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
public:
using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) {
RtcpHelper(Socket::Ptr rtcp_sock, std::string app_name, std::string stream_id) {
_rtcp_sock = std::move(rtcp_sock);
_app_name = app_name;
_stream_id = std::move(stream_id);
}
@ -60,7 +61,7 @@ public:
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true);
_process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
_process->setOnDetach(std::move(_on_detach));
cancelDelayTask();
@ -96,7 +97,7 @@ public:
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec);
_delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() {
if (auto strong_self = weak_self.lock()) {
auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false);
auto process = RtpSelector::Instance().getProcess(strong_self->_app_name, strong_self->_stream_id, false);
if (!process && strong_self->_on_detach) {
strong_self->_on_detach();
}
@ -119,6 +120,10 @@ public:
}
}
string getAppName() { return _app_name; }
string getStreamId() { return _stream_id; }
private:
void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) {
// 每5秒发送一次rtcp
@ -150,13 +155,14 @@ private:
Ticker _ticker;
Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process;
std::string _app_name;
std::string _stream_id;
function<void()> _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
EventPoller::DelayTask::Ptr _delay_task;
};
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
void RtpServer::start(uint16_t local_port, const string &app_name, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
//创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
@ -199,7 +205,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
//增加了多路复用判断如果多路复用为true就走else逻辑同时保留了原来stream_id为空走else逻辑
if (!stream_id.empty() && !multiplex) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), app_name, stream_id);
helper->startRtcp();
helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track);
bool bind_peer_addr = false;
@ -252,6 +258,14 @@ uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port();
}
std::string RtpServer::getAppName() {
return _rtcp_helper ? _rtcp_helper->getAppName() : "";
}
std::string RtpServer::getStreamId() {
return _rtcp_helper ? _rtcp_helper->getStreamId() : "";
}
void RtpServer::connectToServer(const std::string &url, uint16_t port, const function<void(const SockException &ex)> &cb) {
if (_tcp_mode != ACTIVE || !_rtp_socket) {
cb(SockException(Err_other, "仅支持tcp主动模式"));

View File

@ -43,7 +43,8 @@ public:
* @param ssrc ssrc
* @param multiplex
*/
void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
void start(
uint16_t local_port, const std::string &app_name = "", const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false);
/**
@ -59,6 +60,16 @@ public:
*/
uint16_t getPort();
/**
* app名称
*/
std::string getAppName();
/**
* stream id
*/
std::string getStreamId();
/**
* RtpProcess onDetach事件回调
*/

View File

@ -21,6 +21,7 @@ using namespace toolkit;
namespace mediakit{
const string RtpSession::kAppName = "app_name";
const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kSSRC = "ssrc";
const string RtpSession::kOnlyTrack = "only_track";
@ -31,6 +32,7 @@ void RtpSession::attachServer(const Server &server) {
}
void RtpSession::setParams(mINI &ini) {
_app_name = ini[kAppName];
_stream_id = ini[kStreamID];
_ssrc = ini[kSSRC];
_only_track = ini[kOnlyTrack];
@ -114,7 +116,7 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_stream_id = printSSRC(_ssrc);
}
try {
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process = RtpSelector::Instance().getProcess(_app_name, _stream_id, true);
} catch (RtpSelector::ProcessExisted &ex) {
if (!_is_udp) {
// tcp情况下立即断开连接

View File

@ -22,6 +22,7 @@ namespace mediakit{
class RtpSession : public toolkit::Session, public RtpSplitter, public MediaSourceEvent {
public:
static const std::string kAppName;
static const std::string kStreamID;
static const std::string kSSRC;
static const std::string kOnlyTrack;
@ -55,6 +56,7 @@ private:
int _only_track = 0;
uint32_t _ssrc = 0;
toolkit::Ticker _ticker;
std::string _app_name;
std::string _stream_id;
struct sockaddr_storage _addr;
RtpProcess::Ptr _process;

View File

@ -42,7 +42,7 @@ static bool loadFile(const char *path, const EventPoller::Ptr &poller) {
memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET;
auto sock = Socket::createSocket(poller);
auto process = RtpSelector::Instance().getProcess("test", true);
auto process = RtpSelector::Instance().getProcess("", "test", true);
uint64_t stamp_last = 0;
auto total_size = std::make_shared<size_t>(0);