diff --git a/webrtc/IceServer.cpp b/webrtc/IceServer.cpp index 5e5d3b27..64c79a97 100644 --- a/webrtc/IceServer.cpp +++ b/webrtc/IceServer.cpp @@ -263,7 +263,7 @@ namespace RTC for (; it != this->tuples.end(); ++it) { - RTC::TransportTuple* storedTuple = it->get(); + RTC::TransportTuple* storedTuple = *it; if (storedTuple == tuple) { @@ -281,16 +281,16 @@ namespace RTC this->tuples.erase(it); // If this is not the selected tuple, stop here. - if (removedTuple != this->selectedTuple) + if (removedTuple != this->selectedTuple.lock().get()) return; // Otherwise this was the selected tuple. - this->selectedTuple = nullptr; + // this->selectedTuple = nullptr; // Mark the first tuple as selected tuple (if any). - if (this->tuples.begin() != this->tuples.end()) + if (!this->tuples.empty()) { - SetSelectedTuple(this->tuples.begin()->get()); + SetSelectedTuple(this->tuples.front()); } // Or just emit 'disconnected'. else @@ -306,8 +306,7 @@ namespace RTC { MS_TRACE(); - MS_ASSERT( - this->selectedTuple, "cannot force the selected tuple if there was not a selected tuple"); + MS_ASSERT(!this->selectedTuple.expired(), "cannot force the selected tuple if there was not a selected tuple"); auto* storedTuple = HasTuple(tuple); @@ -332,7 +331,7 @@ namespace RTC this->tuples.empty(), "state is 'new' but there are %zu tuples", this->tuples.size()); // There shouldn't be a selected tuple. - MS_ASSERT(!this->selectedTuple, "state is 'new' but there is selected tuple"); + MS_ASSERT(!this->selectedTuple.expired(), "state is 'new' but there is selected tuple"); if (!hasUseCandidate) { @@ -375,7 +374,7 @@ namespace RTC this->tuples.size()); // There shouldn't be a selected tuple. - MS_ASSERT(!this->selectedTuple, "state is 'disconnected' but there is selected tuple"); + MS_ASSERT(!this->selectedTuple.expired(), "state is 'disconnected' but there is selected tuple"); if (!hasUseCandidate) { @@ -415,7 +414,7 @@ namespace RTC MS_ASSERT(!this->tuples.empty(), "state is 'connected' but there are no tuples"); // There should be a selected tuple. - MS_ASSERT(this->selectedTuple, "state is 'connected' but there is not selected tuple"); + MS_ASSERT(!this->selectedTuple.expired(), "state is 'connected' but there is not selected tuple"); if (!hasUseCandidate) { @@ -450,7 +449,7 @@ namespace RTC MS_ASSERT(!this->tuples.empty(), "state is 'completed' but there are no tuples"); // There should be a selected tuple. - MS_ASSERT(this->selectedTuple, "state is 'completed' but there is not selected tuple"); + MS_ASSERT(!this->selectedTuple.expired(), "state is 'completed' but there is not selected tuple"); if (!hasUseCandidate) { @@ -480,7 +479,7 @@ namespace RTC MS_TRACE(); // Add the new tuple at the beginning of the list. - this->tuples.push_front(tuple->shared_from_this()); + this->tuples.push_front(tuple); // Return the address of the inserted tuple. return tuple; @@ -492,17 +491,17 @@ namespace RTC // If there is no selected tuple yet then we know that the tuples list // is empty. - if (!this->selectedTuple) + if (this->selectedTuple.expired()) return nullptr; // Check the current selected tuple. - if (selectedTuple == tuple) - return this->selectedTuple; + if (selectedTuple.lock().get() == tuple) + return this->selectedTuple.lock().get(); // Otherwise check other stored tuples. for (auto it : this->tuples) { - auto storedTuple = it.get(); + auto storedTuple = it; if (storedTuple == tuple) return storedTuple; } @@ -515,12 +514,12 @@ namespace RTC MS_TRACE(); // If already the selected tuple do nothing. - if (storedTuple == this->selectedTuple) + if (storedTuple == this->selectedTuple.lock().get()) return; - this->selectedTuple = storedTuple; + this->selectedTuple = storedTuple->shared_from_this(); // Notify the listener. - this->listener->OnIceServerSelectedTuple(this, this->selectedTuple); + this->listener->OnIceServerSelectedTuple(this, storedTuple); } } // namespace RTC diff --git a/webrtc/IceServer.hpp b/webrtc/IceServer.hpp index 7c0d9f50..57c3f65a 100644 --- a/webrtc/IceServer.hpp +++ b/webrtc/IceServer.hpp @@ -83,7 +83,7 @@ namespace RTC } RTC::TransportTuple* GetSelectedTuple() const { - return this->selectedTuple; + return this->selectedTuple.lock().get(); } void SetUsernameFragment(const std::string& usernameFragment) { @@ -101,7 +101,9 @@ namespace RTC // and the given tuple must be an already valid tuple. void ForceSelectedTuple(const RTC::TransportTuple* tuple); - private: + const std::list& GetTuples() const { return tuples; } + + private: void HandleTuple(RTC::TransportTuple* tuple, bool hasUseCandidate); /** * Store the given tuple and return its stored address. @@ -126,8 +128,8 @@ namespace RTC std::string oldUsernameFragment; std::string oldPassword; IceState state{ IceState::NEW }; - std::list> tuples; - RTC::TransportTuple* selectedTuple{ nullptr }; + std::list tuples; + std::weak_ptr selectedTuple; //最大不超过mtu static constexpr size_t StunSerializeBufferSize{ 1600 }; uint8_t StunSerializeBuffer[StunSerializeBufferSize]; diff --git a/webrtc/WebRtcPlayer.cpp b/webrtc/WebRtcPlayer.cpp index 3b533594..bbdb6274 100644 --- a/webrtc/WebRtcPlayer.cpp +++ b/webrtc/WebRtcPlayer.cpp @@ -70,21 +70,17 @@ void WebRtcPlayer::onStartWebRTC() { } } void WebRtcPlayer::onDestory() { - WebRtcTransportImp::onDestory(); - auto duration = getDuration(); auto bytes_usage = getBytesUsage(); //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (_reader && getSession()) { - WarnL << "RTC播放器(" - << _media_info.shortUrl() - << ")结束播放,耗时(s):" << duration; + WarnL << "RTC播放器(" << _media_info.shortUrl() << ")结束播放,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, - true, static_cast(*getSession())); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, true, static_cast(*getSession())); } } + WebRtcTransportImp::onDestory(); } void WebRtcPlayer::onRtcConfigure(RtcConfigure &configure) const { diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 2fdd5e9a..8fe6549c 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -118,20 +118,15 @@ void WebRtcPusher::onStartWebRTC() { } void WebRtcPusher::onDestory() { - WebRtcTransportImp::onDestory(); - auto duration = getDuration(); auto bytes_usage = getBytesUsage(); //流量统计事件广播 GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); if (getSession()) { - WarnL << "RTC推流器(" - << _media_info.shortUrl() - << ")结束推流,耗时(s):" << duration; + WarnL << "RTC推流器(" << _media_info.shortUrl() << ")结束推流,耗时(s):" << duration; if (bytes_usage >= iFlowThreshold * 1024) { - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, - false, static_cast(*getSession())); + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, false, static_cast(*getSession())); } } @@ -142,6 +137,7 @@ void WebRtcPusher::onDestory() { auto push_src = std::move(_push_src); getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; }); } + WebRtcTransportImp::onDestory(); } void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const { diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index e51c7eae..ade1ce20 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -48,8 +48,6 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) { //////////////////////////////////////////////////////////////////////////////// WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) { - socklen_t addr_len = sizeof(_peer_addr); - getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len); _over_tcp = sock->sockType() == SockNum::Sock_TCP; } @@ -87,14 +85,12 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) { //3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程) throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName()); } - - transport->setSession(shared_from_this()); _transport = std::move(transport); InfoP(this); } _ticker.resetTime(); CHECK(_transport); - _transport->inputSockData((char *)data, len, this);// (struct sockaddr *)&_peer_addr); + _transport->inputSockData((char *)data, len, this); } void WebRtcSession::onRecv(const Buffer::Ptr &buffer) { @@ -116,9 +112,11 @@ void WebRtcSession::onError(const SockException &err) { } auto self = shared_from_this(); auto transport = std::move(_transport); - getPoller()->async([transport, self] { + getPoller()->async([transport, self]() mutable { //延时减引用,防止使用transport对象时,销毁对象 - transport->RemoveTuple(self.get()); + transport->removeTuple(self.get()); + //确保transport在Session对象前销毁,防止WebRtcTransport::onDestory()时获取不到Session对象 + transport = nullptr; }, false); } diff --git a/webrtc/WebRtcSession.h b/webrtc/WebRtcSession.h index 6ce881ba..f70d5e74 100644 --- a/webrtc/WebRtcSession.h +++ b/webrtc/WebRtcSession.h @@ -46,7 +46,6 @@ private: bool _over_tcp = false; bool _find_transport = true; Ticker _ticker; - struct sockaddr_storage _peer_addr; std::weak_ptr _server; WebRtcTransportImp::Ptr _transport; }; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 1e5e3ff9..3a57e2f3 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -78,10 +78,9 @@ static void translateIPFromEnv(std::vector &v) { const char* sockTypeStr(Session* session) { if (session) { switch (session->getSock()->sockType()) { - case SockNum::Sock_TCP: - return "tcp"; - case SockNum::Sock_UDP: - return "udp"; + case SockNum::Sock_TCP: return "tcp"; + case SockNum::Sock_UDP: return "udp"; + default: break; } } return "unknown"; @@ -123,6 +122,8 @@ void WebRtcTransport::OnIceServerSendStunPacket( void WebRtcTransportImp::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) { InfoL << getIdentifier() << " select tuple " << sockTypeStr(tuple) << " " << tuple->get_peer_ip() << ":" << tuple->get_peer_port(); + tuple->setSendFlushFlag(false); + unrefSelf(); } void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) { @@ -227,8 +228,9 @@ void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTu onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple()); } -RTC::TransportTuple *WebRtcTransport::getSelectedTuple() const { - return _ice_server->GetSelectedTuple(); +Session::Ptr WebRtcTransport::getSession() const { + auto tuple = _ice_server->GetSelectedTuple(); + return tuple ? tuple->shared_from_this() : nullptr; } void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) { @@ -1055,32 +1057,16 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx void WebRtcTransportImp::onShutdown(const SockException &ex) { WarnL << ex.what(); unrefSelf(); - for (auto &pr : _history_sessions) { - auto session = pr.second.lock(); - if (session) { - session->shutdown(ex); - } + for (auto &tuple : _ice_server->GetTuples()) { + tuple->shutdown(ex); } } -void WebRtcTransportImp::RemoveTuple(RTC::TransportTuple* tuple) -{ - InfoL << getIdentifier() << " RemoveTuple " << tuple->get_peer_ip() << ":" << tuple->get_peer_port(); - this->_history_sessions.erase(tuple); +void WebRtcTransportImp::removeTuple(RTC::TransportTuple *tuple) { + InfoL << getIdentifier() << " remove tuple " << tuple->get_peer_ip() << ":" << tuple->get_peer_port(); this->_ice_server->RemoveTuple(tuple); } -void WebRtcTransportImp::setSession(Session::Ptr session) { - _history_sessions.emplace(session.get(), session); - session->setSendFlushFlag(false); - unrefSelf(); -} - -const Session::Ptr &WebRtcTransportImp::getSession() const { - Session* ret = _ice_server?_ice_server->GetSelectedTuple():nullptr; - return ret ? ret->shared_from_this() : nullptr; -} - uint64_t WebRtcTransportImp::getBytesUsage() const { return _bytes_usage; } diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index f4595501..dfae8012 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -110,6 +110,7 @@ public: void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr); const EventPoller::Ptr& getPoller() const; + Session::Ptr getSession() const; protected: //// dtls相关的回调 //// @@ -158,7 +159,6 @@ protected: virtual void onRtcpBye() = 0; protected: - RTC::TransportTuple* getSelectedTuple() const; void sendRtcpRemb(uint32_t ssrc, size_t bit_rate); void sendRtcpPli(uint32_t ssrc); @@ -238,8 +238,6 @@ public: using Ptr = std::shared_ptr; ~WebRtcTransportImp() override; - void setSession(Session::Ptr session); - const Session::Ptr& getSession() const; uint64_t getBytesUsage() const; uint64_t getDuration() const; bool canSendRtp() const; @@ -247,8 +245,8 @@ public: void onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false); void createRtpChannel(const std::string &rid, uint32_t ssrc, MediaTrack &track); + void removeTuple(RTC::TransportTuple* tuple); - void RemoveTuple(RTC::TransportTuple* tuple); protected: void OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) override; WebRtcTransportImp(const EventPoller::Ptr &poller,bool preferred_tcp = false); @@ -293,8 +291,6 @@ private: Ticker _alive_ticker; //pli rtcp计时器 Ticker _pli_ticker; - //链接迁移前后使用过的udp链接 - std::unordered_map > _history_sessions; //twcc rtcp发送上下文对象 TwccContext _twcc_ctx; //根据发送rtp的track类型获取相关信息