diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index b4eb0998..25473d4e 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -118,7 +118,7 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 /** * rtp推流成功与否的回调(第一次成功后,后面将一直重试) */ -typedef void(API_CALL *on_mk_media_source_send_rtp_result)(void *user_data, int err, const char *msg); +typedef void(API_CALL *on_mk_media_source_send_rtp_result)(void *user_data, uint16_t local_port, int err, const char *msg); //MediaSource::startSendRtp,请参考mk_media_start_send_rtp,注意ctx参数类型不一样 API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data); diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 8e120c6f..23dd9f33 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -214,9 +214,9 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data){ assert(ctx && dst_url && ssrc); MediaSource *src = (MediaSource *)ctx; - src->startSendRtp(dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ + src->startSendRtp(dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { - cb(user_data, ex.getErrCode(), ex.what()); + cb(user_data, local_port, ex.getErrCode(), ex.what()); } }); } diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index d2d4dcc3..e0fd39da 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -193,9 +193,9 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; //sender参数无用 - (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](uint16_t local_port, const SockException &ex){ if (cb) { - cb(user_data, ex.getErrCode(), ex.what()); + cb(user_data, local_port, ex.getErrCode(), ex.what()); } }); } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index dd7055b5..6f4d4f0b 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -812,11 +812,12 @@ void installWebApi() { } //src_port为空时,则随机本地端口 - src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], allArgs["src_port"], [val, headerOut, invoker](const SockException &ex){ + src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], allArgs["src_port"], [val, headerOut, invoker](uint16_t local_port, const SockException &ex){ if (ex) { const_cast(val)["code"] = API::OtherFailed; const_cast(val)["msg"] = ex.what(); } + const_cast(val)["local_port"] = local_port; invoker("200 OK", headerOut, val.toStyledString()); }); }); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 966620f7..92a9bd22 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -183,10 +183,10 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (!listener) { - cb(SockException(Err_other, "尚未设置事件监听器")); + cb(0, SockException(Err_other, "尚未设置事件监听器")); return; } return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, src_port, cb); @@ -642,7 +642,7 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (listener) { listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb); diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 0095ee22..41a228c2 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -83,7 +83,7 @@ public: // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) { cb(0, SockException(Err_other, "not implemented"));}; // 停止发送ps-rtp virtual bool stopSendRtp(MediaSource &sender, const string &ssrc) {return false; } @@ -112,7 +112,7 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; bool stopSendRtp(MediaSource &sender, const string &ssrc) override; private: @@ -256,7 +256,7 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb); + void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb); // 停止发送ps-rtp bool stopSendRtp(const string &ssrc); diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index f2ed90c3..0933c0d6 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -329,12 +329,12 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ #if defined(ENABLE_RTPPROXY) RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); - rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](const SockException &ex) { - cb(ex); + rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](uint16_t local_port, const SockException &ex) { + cb(local_port, ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { return; diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 2ffe8163..d1a19032 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -142,7 +142,7 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; /** * 停止ps-rtp发送 diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 50710a1e..203b2567 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -26,7 +26,7 @@ RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { RtpSender::~RtpSender() { } -void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb){ +void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb){ _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; @@ -36,21 +36,22 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, if (is_udp) { _socket->bindUdpSock(src_port); auto poller = _poller; - WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() { + auto local_port = _socket->get_local_port(); + WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller, local_port]() { struct sockaddr addr; //切换线程目的是为了dns解析放在后台线程执行 if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) { - poller->async([dst_url, cb]() { + poller->async([dst_url, cb, local_port]() { //切回自己的线程 - cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); + cb(local_port, SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); }); return; } //dns解析成功 - poller->async([addr, weak_self, cb]() { + poller->async([addr, weak_self, cb, local_port]() { //切回自己的线程 - cb(SockException()); + cb(local_port, SockException()); auto strong_self = weak_self.lock(); if (strong_self) { strong_self->_socket->setSendPeerAddr(&addr); @@ -60,11 +61,15 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, }); } else { _socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) { - cb(err); auto strong_self = weak_self.lock(); - if (strong_self && !err) { - //tcp连接成功 - strong_self->onConnect(); + if (strong_self) { + if (!err) { + //tcp连接成功 + strong_self->onConnect(); + } + cb(strong_self->_socket->get_local_port(), err); + } else { + cb(0, err); } }, 5.0F, "0.0.0.0", src_port); } @@ -87,6 +92,8 @@ void RtpSender::onConnect(){ strong_self->onErr(err); } }); + //获取本地端口,断开重连后确保端口不变 + _src_port = _socket->get_local_port(); InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; } @@ -150,7 +157,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { if (!strong_self) { return false; } - strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](const SockException &ex){ + strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](uint16_t local_port, const SockException &ex){ auto strong_self = weak_self.lock(); if (strong_self && ex) { //连接失败且本对象未销毁,那么重试连接 diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 840792f7..684c6fd4 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -37,7 +37,7 @@ public: * @param is_udp 是否采用udp方式发送rtp * @param cb 连接目标端口是否成功的回调 */ - void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb); + void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb); /** * 输入帧数据