精简代码,修复webrtc客户端断开时不触发流量统计事件的bug
This commit is contained in:
parent
ff554f4cb0
commit
24a1b09dd1
|
|
@ -263,7 +263,7 @@ namespace RTC
|
|||
|
||||
for (; it != this->tuples.end(); ++it)
|
||||
{
|
||||
RTC::TransportTuple* storedTuple = it->get();
|
||||
RTC::TransportTuple* storedTuple = *it;
|
||||
|
||||
if (storedTuple == tuple)
|
||||
{
|
||||
|
|
@ -281,16 +281,16 @@ namespace RTC
|
|||
this->tuples.erase(it);
|
||||
|
||||
// If this is not the selected tuple, stop here.
|
||||
if (removedTuple != this->selectedTuple)
|
||||
if (removedTuple != this->selectedTuple.lock().get())
|
||||
return;
|
||||
|
||||
// Otherwise this was the selected tuple.
|
||||
this->selectedTuple = nullptr;
|
||||
// this->selectedTuple = nullptr;
|
||||
|
||||
// Mark the first tuple as selected tuple (if any).
|
||||
if (this->tuples.begin() != this->tuples.end())
|
||||
if (!this->tuples.empty())
|
||||
{
|
||||
SetSelectedTuple(this->tuples.begin()->get());
|
||||
SetSelectedTuple(this->tuples.front());
|
||||
}
|
||||
// Or just emit 'disconnected'.
|
||||
else
|
||||
|
|
@ -306,8 +306,7 @@ namespace RTC
|
|||
{
|
||||
MS_TRACE();
|
||||
|
||||
MS_ASSERT(
|
||||
this->selectedTuple, "cannot force the selected tuple if there was not a selected tuple");
|
||||
MS_ASSERT(!this->selectedTuple.expired(), "cannot force the selected tuple if there was not a selected tuple");
|
||||
|
||||
auto* storedTuple = HasTuple(tuple);
|
||||
|
||||
|
|
@ -332,7 +331,7 @@ namespace RTC
|
|||
this->tuples.empty(), "state is 'new' but there are %zu tuples", this->tuples.size());
|
||||
|
||||
// There shouldn't be a selected tuple.
|
||||
MS_ASSERT(!this->selectedTuple, "state is 'new' but there is selected tuple");
|
||||
MS_ASSERT(!this->selectedTuple.expired(), "state is 'new' but there is selected tuple");
|
||||
|
||||
if (!hasUseCandidate)
|
||||
{
|
||||
|
|
@ -375,7 +374,7 @@ namespace RTC
|
|||
this->tuples.size());
|
||||
|
||||
// There shouldn't be a selected tuple.
|
||||
MS_ASSERT(!this->selectedTuple, "state is 'disconnected' but there is selected tuple");
|
||||
MS_ASSERT(!this->selectedTuple.expired(), "state is 'disconnected' but there is selected tuple");
|
||||
|
||||
if (!hasUseCandidate)
|
||||
{
|
||||
|
|
@ -415,7 +414,7 @@ namespace RTC
|
|||
MS_ASSERT(!this->tuples.empty(), "state is 'connected' but there are no tuples");
|
||||
|
||||
// There should be a selected tuple.
|
||||
MS_ASSERT(this->selectedTuple, "state is 'connected' but there is not selected tuple");
|
||||
MS_ASSERT(!this->selectedTuple.expired(), "state is 'connected' but there is not selected tuple");
|
||||
|
||||
if (!hasUseCandidate)
|
||||
{
|
||||
|
|
@ -450,7 +449,7 @@ namespace RTC
|
|||
MS_ASSERT(!this->tuples.empty(), "state is 'completed' but there are no tuples");
|
||||
|
||||
// There should be a selected tuple.
|
||||
MS_ASSERT(this->selectedTuple, "state is 'completed' but there is not selected tuple");
|
||||
MS_ASSERT(!this->selectedTuple.expired(), "state is 'completed' but there is not selected tuple");
|
||||
|
||||
if (!hasUseCandidate)
|
||||
{
|
||||
|
|
@ -480,7 +479,7 @@ namespace RTC
|
|||
MS_TRACE();
|
||||
|
||||
// Add the new tuple at the beginning of the list.
|
||||
this->tuples.push_front(tuple->shared_from_this());
|
||||
this->tuples.push_front(tuple);
|
||||
|
||||
// Return the address of the inserted tuple.
|
||||
return tuple;
|
||||
|
|
@ -492,17 +491,17 @@ namespace RTC
|
|||
|
||||
// If there is no selected tuple yet then we know that the tuples list
|
||||
// is empty.
|
||||
if (!this->selectedTuple)
|
||||
if (this->selectedTuple.expired())
|
||||
return nullptr;
|
||||
|
||||
// Check the current selected tuple.
|
||||
if (selectedTuple == tuple)
|
||||
return this->selectedTuple;
|
||||
if (selectedTuple.lock().get() == tuple)
|
||||
return this->selectedTuple.lock().get();
|
||||
|
||||
// Otherwise check other stored tuples.
|
||||
for (auto it : this->tuples)
|
||||
{
|
||||
auto storedTuple = it.get();
|
||||
auto storedTuple = it;
|
||||
if (storedTuple == tuple)
|
||||
return storedTuple;
|
||||
}
|
||||
|
|
@ -515,12 +514,12 @@ namespace RTC
|
|||
MS_TRACE();
|
||||
|
||||
// If already the selected tuple do nothing.
|
||||
if (storedTuple == this->selectedTuple)
|
||||
if (storedTuple == this->selectedTuple.lock().get())
|
||||
return;
|
||||
|
||||
this->selectedTuple = storedTuple;
|
||||
this->selectedTuple = storedTuple->shared_from_this();
|
||||
|
||||
// Notify the listener.
|
||||
this->listener->OnIceServerSelectedTuple(this, this->selectedTuple);
|
||||
this->listener->OnIceServerSelectedTuple(this, storedTuple);
|
||||
}
|
||||
} // namespace RTC
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ namespace RTC
|
|||
}
|
||||
RTC::TransportTuple* GetSelectedTuple() const
|
||||
{
|
||||
return this->selectedTuple;
|
||||
return this->selectedTuple.lock().get();
|
||||
}
|
||||
void SetUsernameFragment(const std::string& usernameFragment)
|
||||
{
|
||||
|
|
@ -101,7 +101,9 @@ namespace RTC
|
|||
// and the given tuple must be an already valid tuple.
|
||||
void ForceSelectedTuple(const RTC::TransportTuple* tuple);
|
||||
|
||||
private:
|
||||
const std::list<RTC::TransportTuple *>& GetTuples() const { return tuples; }
|
||||
|
||||
private:
|
||||
void HandleTuple(RTC::TransportTuple* tuple, bool hasUseCandidate);
|
||||
/**
|
||||
* Store the given tuple and return its stored address.
|
||||
|
|
@ -126,8 +128,8 @@ namespace RTC
|
|||
std::string oldUsernameFragment;
|
||||
std::string oldPassword;
|
||||
IceState state{ IceState::NEW };
|
||||
std::list<std::shared_ptr<RTC::TransportTuple>> tuples;
|
||||
RTC::TransportTuple* selectedTuple{ nullptr };
|
||||
std::list<RTC::TransportTuple *> tuples;
|
||||
std::weak_ptr<RTC::TransportTuple> selectedTuple;
|
||||
//最大不超过mtu
|
||||
static constexpr size_t StunSerializeBufferSize{ 1600 };
|
||||
uint8_t StunSerializeBuffer[StunSerializeBufferSize];
|
||||
|
|
|
|||
|
|
@ -70,21 +70,17 @@ void WebRtcPlayer::onStartWebRTC() {
|
|||
}
|
||||
}
|
||||
void WebRtcPlayer::onDestory() {
|
||||
WebRtcTransportImp::onDestory();
|
||||
|
||||
auto duration = getDuration();
|
||||
auto bytes_usage = getBytesUsage();
|
||||
//流量统计事件广播
|
||||
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
|
||||
if (_reader && getSession()) {
|
||||
WarnL << "RTC播放器("
|
||||
<< _media_info.shortUrl()
|
||||
<< ")结束播放,耗时(s):" << duration;
|
||||
WarnL << "RTC播放器(" << _media_info.shortUrl() << ")结束播放,耗时(s):" << duration;
|
||||
if (bytes_usage >= iFlowThreshold * 1024) {
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration,
|
||||
true, static_cast<SockInfo &>(*getSession()));
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, true, static_cast<SockInfo &>(*getSession()));
|
||||
}
|
||||
}
|
||||
WebRtcTransportImp::onDestory();
|
||||
}
|
||||
|
||||
void WebRtcPlayer::onRtcConfigure(RtcConfigure &configure) const {
|
||||
|
|
|
|||
|
|
@ -118,20 +118,15 @@ void WebRtcPusher::onStartWebRTC() {
|
|||
}
|
||||
|
||||
void WebRtcPusher::onDestory() {
|
||||
WebRtcTransportImp::onDestory();
|
||||
|
||||
auto duration = getDuration();
|
||||
auto bytes_usage = getBytesUsage();
|
||||
//流量统计事件广播
|
||||
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
|
||||
|
||||
if (getSession()) {
|
||||
WarnL << "RTC推流器("
|
||||
<< _media_info.shortUrl()
|
||||
<< ")结束推流,耗时(s):" << duration;
|
||||
WarnL << "RTC推流器(" << _media_info.shortUrl() << ")结束推流,耗时(s):" << duration;
|
||||
if (bytes_usage >= iFlowThreshold * 1024) {
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration,
|
||||
false, static_cast<SockInfo &>(*getSession()));
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, bytes_usage, duration, false, static_cast<SockInfo &>(*getSession()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,6 +137,7 @@ void WebRtcPusher::onDestory() {
|
|||
auto push_src = std::move(_push_src);
|
||||
getPoller()->doDelayTask(_continue_push_ms, [push_src]() { return 0; });
|
||||
}
|
||||
WebRtcTransportImp::onDestory();
|
||||
}
|
||||
|
||||
void WebRtcPusher::onRtcConfigure(RtcConfigure &configure) const {
|
||||
|
|
|
|||
|
|
@ -48,8 +48,6 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) {
|
||||
socklen_t addr_len = sizeof(_peer_addr);
|
||||
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
|
||||
_over_tcp = sock->sockType() == SockNum::Sock_TCP;
|
||||
}
|
||||
|
||||
|
|
@ -87,14 +85,12 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) {
|
|||
//3、销毁原先的socket和WebRtcSession(原先的对象跟WebRtcTransport不在同一条线程)
|
||||
throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName());
|
||||
}
|
||||
|
||||
transport->setSession(shared_from_this());
|
||||
_transport = std::move(transport);
|
||||
InfoP(this);
|
||||
}
|
||||
_ticker.resetTime();
|
||||
CHECK(_transport);
|
||||
_transport->inputSockData((char *)data, len, this);// (struct sockaddr *)&_peer_addr);
|
||||
_transport->inputSockData((char *)data, len, this);
|
||||
}
|
||||
|
||||
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
|
||||
|
|
@ -116,9 +112,11 @@ void WebRtcSession::onError(const SockException &err) {
|
|||
}
|
||||
auto self = shared_from_this();
|
||||
auto transport = std::move(_transport);
|
||||
getPoller()->async([transport, self] {
|
||||
getPoller()->async([transport, self]() mutable {
|
||||
//延时减引用,防止使用transport对象时,销毁对象
|
||||
transport->RemoveTuple(self.get());
|
||||
transport->removeTuple(self.get());
|
||||
//确保transport在Session对象前销毁,防止WebRtcTransport::onDestory()时获取不到Session对象
|
||||
transport = nullptr;
|
||||
}, false);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ private:
|
|||
bool _over_tcp = false;
|
||||
bool _find_transport = true;
|
||||
Ticker _ticker;
|
||||
struct sockaddr_storage _peer_addr;
|
||||
std::weak_ptr<toolkit::TcpServer> _server;
|
||||
WebRtcTransportImp::Ptr _transport;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -78,10 +78,9 @@ static void translateIPFromEnv(std::vector<std::string> &v) {
|
|||
const char* sockTypeStr(Session* session) {
|
||||
if (session) {
|
||||
switch (session->getSock()->sockType()) {
|
||||
case SockNum::Sock_TCP:
|
||||
return "tcp";
|
||||
case SockNum::Sock_UDP:
|
||||
return "udp";
|
||||
case SockNum::Sock_TCP: return "tcp";
|
||||
case SockNum::Sock_UDP: return "udp";
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
return "unknown";
|
||||
|
|
@ -123,6 +122,8 @@ void WebRtcTransport::OnIceServerSendStunPacket(
|
|||
|
||||
void WebRtcTransportImp::OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) {
|
||||
InfoL << getIdentifier() << " select tuple " << sockTypeStr(tuple) << " " << tuple->get_peer_ip() << ":" << tuple->get_peer_port();
|
||||
tuple->setSendFlushFlag(false);
|
||||
unrefSelf();
|
||||
}
|
||||
|
||||
void WebRtcTransport::OnIceServerConnected(const RTC::IceServer *iceServer) {
|
||||
|
|
@ -227,8 +228,9 @@ void WebRtcTransport::sendSockData(const char *buf, size_t len, RTC::TransportTu
|
|||
onSendSockData(std::move(pkt), true, tuple ? tuple : _ice_server->GetSelectedTuple());
|
||||
}
|
||||
|
||||
RTC::TransportTuple *WebRtcTransport::getSelectedTuple() const {
|
||||
return _ice_server->GetSelectedTuple();
|
||||
Session::Ptr WebRtcTransport::getSession() const {
|
||||
auto tuple = _ice_server->GetSelectedTuple();
|
||||
return tuple ? tuple->shared_from_this() : nullptr;
|
||||
}
|
||||
|
||||
void WebRtcTransport::sendRtcpRemb(uint32_t ssrc, size_t bit_rate) {
|
||||
|
|
@ -1055,32 +1057,16 @@ void WebRtcTransportImp::onBeforeEncryptRtp(const char *buf, int &len, void *ctx
|
|||
void WebRtcTransportImp::onShutdown(const SockException &ex) {
|
||||
WarnL << ex.what();
|
||||
unrefSelf();
|
||||
for (auto &pr : _history_sessions) {
|
||||
auto session = pr.second.lock();
|
||||
if (session) {
|
||||
session->shutdown(ex);
|
||||
}
|
||||
for (auto &tuple : _ice_server->GetTuples()) {
|
||||
tuple->shutdown(ex);
|
||||
}
|
||||
}
|
||||
|
||||
void WebRtcTransportImp::RemoveTuple(RTC::TransportTuple* tuple)
|
||||
{
|
||||
InfoL << getIdentifier() << " RemoveTuple " << tuple->get_peer_ip() << ":" << tuple->get_peer_port();
|
||||
this->_history_sessions.erase(tuple);
|
||||
void WebRtcTransportImp::removeTuple(RTC::TransportTuple *tuple) {
|
||||
InfoL << getIdentifier() << " remove tuple " << tuple->get_peer_ip() << ":" << tuple->get_peer_port();
|
||||
this->_ice_server->RemoveTuple(tuple);
|
||||
}
|
||||
|
||||
void WebRtcTransportImp::setSession(Session::Ptr session) {
|
||||
_history_sessions.emplace(session.get(), session);
|
||||
session->setSendFlushFlag(false);
|
||||
unrefSelf();
|
||||
}
|
||||
|
||||
const Session::Ptr &WebRtcTransportImp::getSession() const {
|
||||
Session* ret = _ice_server?_ice_server->GetSelectedTuple():nullptr;
|
||||
return ret ? ret->shared_from_this() : nullptr;
|
||||
}
|
||||
|
||||
uint64_t WebRtcTransportImp::getBytesUsage() const {
|
||||
return _bytes_usage;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -110,6 +110,7 @@ public:
|
|||
void sendRtcpPacket(const char *buf, int len, bool flush, void *ctx = nullptr);
|
||||
|
||||
const EventPoller::Ptr& getPoller() const;
|
||||
Session::Ptr getSession() const;
|
||||
|
||||
protected:
|
||||
//// dtls相关的回调 ////
|
||||
|
|
@ -158,7 +159,6 @@ protected:
|
|||
virtual void onRtcpBye() = 0;
|
||||
|
||||
protected:
|
||||
RTC::TransportTuple* getSelectedTuple() const;
|
||||
void sendRtcpRemb(uint32_t ssrc, size_t bit_rate);
|
||||
void sendRtcpPli(uint32_t ssrc);
|
||||
|
||||
|
|
@ -238,8 +238,6 @@ public:
|
|||
using Ptr = std::shared_ptr<WebRtcTransportImp>;
|
||||
~WebRtcTransportImp() override;
|
||||
|
||||
void setSession(Session::Ptr session);
|
||||
const Session::Ptr& getSession() const;
|
||||
uint64_t getBytesUsage() const;
|
||||
uint64_t getDuration() const;
|
||||
bool canSendRtp() const;
|
||||
|
|
@ -247,8 +245,8 @@ public:
|
|||
void onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx = false);
|
||||
|
||||
void createRtpChannel(const std::string &rid, uint32_t ssrc, MediaTrack &track);
|
||||
void removeTuple(RTC::TransportTuple* tuple);
|
||||
|
||||
void RemoveTuple(RTC::TransportTuple* tuple);
|
||||
protected:
|
||||
void OnIceServerSelectedTuple(const RTC::IceServer *iceServer, RTC::TransportTuple *tuple) override;
|
||||
WebRtcTransportImp(const EventPoller::Ptr &poller,bool preferred_tcp = false);
|
||||
|
|
@ -293,8 +291,6 @@ private:
|
|||
Ticker _alive_ticker;
|
||||
//pli rtcp计时器
|
||||
Ticker _pli_ticker;
|
||||
//链接迁移前后使用过的udp链接
|
||||
std::unordered_map<Session *, std::weak_ptr<Session> > _history_sessions;
|
||||
//twcc rtcp发送上下文对象
|
||||
TwccContext _twcc_ctx;
|
||||
//根据发送rtp的track类型获取相关信息
|
||||
|
|
|
|||
Loading…
Reference in New Issue