diff --git a/api/include/mk_rtp_server.h b/api/include/mk_rtp_server.h index bd7f594c..20a0c5ef 100644 --- a/api/include/mk_rtp_server.h +++ b/api/include/mk_rtp_server.h @@ -19,11 +19,27 @@ typedef void* mk_rtp_server; /** * 创建GB28181 RTP 服务器 * @param port 监听端口,0则为随机 - * @param enable_tcp 创建udp端口时是否同时监听tcp端口 + * @param tcp_mode tcp模式(0: 不监听端口 1: 监听端口 2: 主动连接到服务端) * @param stream_id 该端口绑定的流id * @return */ -API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id); +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id); + +/** + * TCP 主动模式时连接到服务器是否成功的回调 + */ +typedef void(API_CALL *on_mk_rtp_server_connected)(void *user_data, int err, const char *what, int sys_err); + +/** + * TCP 主动模式时连接到服务器 + * @param @param ctx 服务器对象 + * @param dst_url 服务端地址 + * @param dst_port 服务端端口 + * @param cb 连接到服务器是否成功的回调 + * @param user_data 用户数据指针 + * @return + */ +API_EXPORT void API_CALL mk_rtp_server_connect(mk_rtp_server ctx, const char *dst_url, uint16_t dst_port, on_mk_rtp_server_connected cb, void *user_data); /** * 销毁GB28181 RTP 服务器 diff --git a/api/source/mk_rtp_server.cpp b/api/source/mk_rtp_server.cpp index 16dfa74d..617418c6 100644 --- a/api/source/mk_rtp_server.cpp +++ b/api/source/mk_rtp_server.cpp @@ -16,23 +16,34 @@ using namespace toolkit; #include "Rtp/RtpServer.h" using namespace mediakit; -API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id){ +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int tcp_mode, const char *stream_id) { RtpServer::Ptr *server = new RtpServer::Ptr(new RtpServer); - (*server)->start(port, stream_id, enable_tcp); + (*server)->start(port, stream_id, (RtpServer::TcpMode)tcp_mode); return server; } -API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx){ +API_EXPORT void API_CALL mk_rtp_server_connect(mk_rtp_server ctx, const char *dst_url, uint16_t dst_port, on_mk_rtp_server_connected cb, void *user_data) { + RtpServer::Ptr *server = (RtpServer::Ptr *)ctx; + if (server) { + (*server)->connectToServer(dst_url, dst_port, [cb, user_data](const SockException &ex) { + if (cb) { + cb(user_data, ex.getErrCode(), ex.what(), ex.getCustomCode()); + } + }); + } +} + +API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx) { RtpServer::Ptr *server = (RtpServer::Ptr *)ctx; delete server; } -API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx){ +API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx) { RtpServer::Ptr *server = (RtpServer::Ptr *)ctx; return (*server)->getPort(); } -API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data){ +API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data) { RtpServer::Ptr *server = (RtpServer::Ptr *) ctx; if (cb) { (*server)->setOnDetach([cb, user_data]() { @@ -45,21 +56,21 @@ API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rt #else -API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id){ +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id) { WarnL << "请打开ENABLE_RTPPROXY后再编译"; return nullptr; } -API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx){ +API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx) { WarnL << "请打开ENABLE_RTPPROXY后再编译"; } -API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx){ +API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx) { WarnL << "请打开ENABLE_RTPPROXY后再编译"; return 0; } -API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data){ +API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data) { WarnL << "请打开ENABLE_RTPPROXY后再编译"; } diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index e456ed08..36654ae0 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1384,9 +1384,9 @@ "description": "绑定的端口,0时为随机端口" }, { - "key": "enable_tcp", + "key": "tcp_mode", "value": "1", - "description": "创建 udp端口时是否同时监听tcp端口" + "description": "tcp模式,0时为不启用tcp监听,1时为启用tcp监听,2时为tcp主动连接模式" }, { "key": "stream_id", @@ -1410,6 +1410,47 @@ }, "response": [] }, + { + "name": "连接RTP服务器(connectRtpServer)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/connectRtpServer?secret={{ZLMediaKit_secret}}&dst_url=127.0.0.1&dst_port=10000&stream_id=test", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "connectRtpServer" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "dst_url", + "value": "0", + "description": "tcp主动模式时服务端地址" + }, + { + "key": "dst_port", + "value": "1", + "description": "tcp主动模式时服务端端口" + }, + { + "key": "stream_id", + "value": "test", + "description": "OpenRtpServer时绑定的流id\n" + } + ] + } + }, + "response": [] + }, { "name": "关闭RTP服务器(closeRtpServer)", "request": { diff --git a/server/WebApi.cpp b/server/WebApi.cpp index ad3b8a27..d72dea53 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -383,7 +383,7 @@ Value makeMediaSourceJson(MediaSource &media){ } #if defined(ENABLE_RTPPROXY) -uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable_tcp, const string &local_ip, bool re_use_port, uint32_t ssrc) { +uint16_t openRtpServer(uint16_t local_port, const string &stream_id, int tcp_mode, const string &local_ip, bool re_use_port, uint32_t ssrc) { lock_guard lck(s_rtpServerMapMtx); if (s_rtpServerMap.find(stream_id) != s_rtpServerMap.end()) { //为了防止RtpProcess所有权限混乱的问题,不允许重复添加相同的stream_id @@ -391,7 +391,7 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable } RtpServer::Ptr server = std::make_shared(); - server->start(local_port, stream_id, enable_tcp, local_ip.c_str(), re_use_port, ssrc); + server->start(local_port, stream_id, (RtpServer::TcpMode)tcp_mode, local_ip.c_str(), re_use_port, ssrc); server->setOnDetach([stream_id]() { //设置rtp超时移除事件 lock_guard lck(s_rtpServerMapMtx); @@ -404,6 +404,16 @@ uint16_t openRtpServer(uint16_t local_port, const string &stream_id, bool enable return server->getPort(); } +void connectRtpServer(const string &stream_id, const string &dst_url, uint16_t dst_port, const function &cb) { + lock_guard lck(s_rtpServerMapMtx); + auto it = s_rtpServerMap.find(stream_id); + if (it == s_rtpServerMap.end()) { + cb(SockException(Err_other, "未找到rtp服务")); + return; + } + it->second->connectToServer(dst_url, dst_port, cb); +} + bool closeRtpServer(const string &stream_id) { lock_guard lck(s_rtpServerMapMtx); auto it = s_rtpServerMap.find(stream_id); @@ -1135,17 +1145,36 @@ void installWebApi() { api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); - CHECK_ARGS("port", "enable_tcp", "stream_id"); + CHECK_ARGS("port", "stream_id"); auto stream_id = allArgs["stream_id"]; - auto port = openRtpServer(allArgs["port"], stream_id, allArgs["enable_tcp"].as(), "::", - allArgs["re_use_port"].as(), allArgs["ssrc"].as()); - if(port == 0) { + auto tcp_mode = allArgs["tcp_mode"].as(); + if (allArgs["enable_tcp"].as() && !tcp_mode) { + //兼容老版本请求,新版本去除enable_tcp参数并新增tcp_mode参数 + tcp_mode = 1; + } + auto port = openRtpServer(allArgs["port"], stream_id, tcp_mode, "::", allArgs["re_use_port"].as(), + allArgs["ssrc"].as()); + if (port == 0) { throw InvalidArgsException("该stream_id已存在"); } //回复json val["port"] = port; }); + api_regist("/index/api/connectRtpServer", [](API_ARGS_MAP_ASYNC) { + CHECK_SECRET(); + CHECK_ARGS("stream_id", "dst_url", "dst_port"); + connectRtpServer( + allArgs["stream_id"], allArgs["dst_url"], allArgs["dst_port"], + [val, headerOut, invoker](const SockException &ex) mutable { + if (ex) { + val["code"] = API::OtherFailed; + val["msg"] = ex.what(); + } + invoker(200, headerOut, val.toStyledString()); + }); + }); + api_regist("/index/api/closeRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id"); diff --git a/server/WebApi.h b/server/WebApi.h index 019e3586..b4287c97 100755 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -231,7 +231,8 @@ bool checkArgs(Args &args, const First &first, const KeyTypes &...keys) { void installWebApi(); void unInstallWebApi(); -uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, bool enable_tcp, const std::string &local_ip, bool re_use_port, uint32_t ssrc); +uint16_t openRtpServer(uint16_t local_port, const std::string &stream_id, int tcp_mode, const std::string &local_ip, bool re_use_port, uint32_t ssrc); +void connectRtpServer(const std::string &stream_id, const std::string &dst_url, uint16_t dst_port, const std::function &cb); bool closeRtpServer(const std::string &stream_id); Json::Value makeMediaSourceJson(mediakit::MediaSource &media); void getStatisticJson(const std::function &cb); diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 8c596357..42aef90a 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -22,8 +22,8 @@ namespace mediakit{ RtpServer::RtpServer() {} RtpServer::~RtpServer() { - if(_on_clearup){ - _on_clearup(); + if (_on_cleanup) { + _on_cleanup(); } } @@ -90,7 +90,7 @@ private: std::shared_ptr _rtcp_addr; }; -void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip, bool re_use_port, uint32_t ssrc) { +void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc) { //创建udp服务器 Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true); Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true); @@ -110,13 +110,19 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_ SockUtil::setRecvBuf(rtp_socket->rawFD(), 4 * 1024 * 1024); TcpServer::Ptr tcp_server; - if (enable_tcp) { + _tcp_mode = tcp_mode; + if (tcp_mode == PASSIVE || tcp_mode == ACTIVE) { //创建tcp服务器 tcp_server = std::make_shared(rtp_socket->getPoller()); (*tcp_server)[RtpSession::kStreamID] = stream_id; (*tcp_server)[RtpSession::kIsUDP] = 0; (*tcp_server)[RtpSession::kSSRC] = ssrc; - tcp_server->start(rtp_socket->get_local_port(), local_ip); + if (tcp_mode == PASSIVE) { + tcp_server->start(rtp_socket->get_local_port(), local_ip); + } else if (stream_id.empty()) { + // tcp主动模式时只能一个端口一个流,必须指定流id; 创建TcpServer对象也仅用于传参 + throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id"); + } } //创建udp服务器 @@ -125,18 +131,21 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_ if (!stream_id.empty()) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) process = RtpSelector::Instance().getProcess(stream_id, true); - RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), process); - helper->startRtcp(); - rtp_socket->setOnRead([rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { - RtpHeader *header = (RtpHeader *)buf->data(); - auto rtp_ssrc = ntohl(header->ssrc); - if (ssrc && rtp_ssrc != ssrc) { - WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; - } else { - process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); - helper->onRecvRtp(buf, addr, addr_len); - } - }); + if (tcp_mode != ACTIVE) { + RtcpHelper::Ptr helper = std::make_shared(std::move(rtcp_socket), process); + helper->startRtcp(); + rtp_socket->setOnRead( + [rtp_socket, process, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + RtpHeader *header = (RtpHeader *)buf->data(); + auto rtp_ssrc = ntohl(header->ssrc); + if (ssrc && rtp_ssrc != ssrc) { + WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc; + } else { + process->inputRtp(true, rtp_socket, buf->data(), buf->size(), addr); + helper->onRecvRtp(buf, addr, addr_len); + } + }); + } } else { #if 1 //单端口多线程接收多个流,根据ssrc区分流 @@ -153,7 +162,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_ #endif } - _on_clearup = [rtp_socket, process, stream_id]() { + _on_cleanup = [rtp_socket, process, stream_id]() { if (rtp_socket) { //去除循环引用 rtp_socket->setOnRead(nullptr); @@ -180,5 +189,42 @@ uint16_t RtpServer::getPort() { return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port(); } +void RtpServer::connectToServer(const std::string &url, uint16_t port, const function &cb) { + if (_tcp_mode != ACTIVE || !_rtp_socket) { + cb(SockException(Err_other, "仅支持tcp主动模式")); + return; + } + weak_ptr weak_self = shared_from_this(); + _rtp_socket->connect(url, port, [url, port, cb, weak_self](const SockException &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + cb(SockException(Err_other, "服务对象已释放")); + return; + } + if (err) { + WarnL << "连接到服务器 " << url << ":" << port << " 失败 " << err.what(); + } else { + InfoL << "连接到服务器 " << url << ":" << port << " 成功"; + strong_self->onConnect(); + } + cb(err); + }, + 5.0F, "::", _rtp_socket->get_local_port()); +} + +void RtpServer::onConnect() { + auto rtp_session = std::make_shared(_rtp_socket); + rtp_session->attachServer(*_tcp_server); + _rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + rtp_session->onRecv(buf); + }); + weak_ptr weak_self = shared_from_this(); + _rtp_socket->setOnErr([weak_self](const SockException &err) { + if (auto strong_self = weak_self.lock()) { + strong_self->_rtp_socket->setOnRead(nullptr); + } + }); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index e68abb5c..ff2480be 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -23,10 +23,11 @@ namespace mediakit{ /** * RTP服务器,支持UDP/TCP */ -class RtpServer { +class RtpServer : public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; using onRecv = std::function; + enum TcpMode { NONE = 0, PASSIVE, ACTIVE }; RtpServer(); ~RtpServer(); @@ -35,13 +36,22 @@ public: * 开启服务器,可能抛异常 * @param local_port 本地端口,0时为随机端口 * @param stream_id 流id,置空则使用ssrc - * @param enable_tcp 是否启用tcp服务器 + * @param tcp_mode tcp服务模式 * @param local_ip 绑定的本地网卡ip * @param re_use_port 是否设置socket为re_use属性 + * @param ssrc 指定的ssrc */ - void start(uint16_t local_port, const std::string &stream_id = "", bool enable_tcp = true, + void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE, const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0); + /** + * 连接到tcp服务(tcp主动模式) + * @param url 服务器地址 + * @param port 服务器端口 + * @param cb 连接服务器是否成功的回调 + */ + void connectToServer(const std::string &url, uint16_t port, const std::function &cb); + /** * 获取绑定的本地端口 */ @@ -52,12 +62,19 @@ public: */ void setOnDetach(const std::function &cb); +private: + // tcp主动模式连接服务器成功回调 + void onConnect(); + protected: toolkit::Socket::Ptr _rtp_socket; toolkit::UdpServer::Ptr _udp_server; toolkit::TcpServer::Ptr _tcp_server; RtpProcess::Ptr _rtp_process; - std::function _on_clearup; + std::function _on_cleanup; + + //用于tcp主动模式 + TcpMode _tcp_mode = NONE; }; }//namespace mediakit