webrtc tcp模式支持线程安全

This commit is contained in:
ziyue 2022-11-18 22:43:29 +08:00
parent f05425825f
commit 82b72fe718
3 changed files with 30 additions and 2 deletions

@ -1 +1 @@
Subproject commit 90ba564e9e39a120ed7b99260f2835a19811af30
Subproject commit 894be81929f227583081755288ab0927c077e411

View File

@ -10,6 +10,7 @@
#include "WebRtcSession.h"
#include "Util/util.h"
#include "Network/TcpServer.h"
using namespace std;
@ -53,13 +54,34 @@ WebRtcSession::~WebRtcSession() {
InfoP(this);
}
void WebRtcSession::attachServer(const Server &server) {
_server = std::dynamic_pointer_cast<toolkit::TcpServer>(const_cast<Server &>(server).shared_from_this());
}
void WebRtcSession::onRecv_l(const char *data, size_t len) {
if (_find_transport) {
// 只允许寻找一次transport
_find_transport = false;
auto user_name = getUserName(data, len);
auto transport = WebRtcTransportManager::Instance().getItem(user_name);
CHECK(transport && transport->getPoller()->isCurrentThread());
CHECK(transport);
//WebRtcTransport在其他poller线程上需要切换poller线程并重新创建WebRtcSession对象
if (!transport->getPoller()->isCurrentThread()) {
auto sock = Socket::createSocket(transport->getPoller());
sock->cloneFromPeerSocket(*(getSock()));
auto server = _server;
std::string str(data, len);
sock->getPoller()->async([sock, server, str](){
auto strong_server = server.lock();
if (strong_server) {
auto session = static_pointer_cast<WebRtcSession>(strong_server->createSession(sock));
session->onRecv_l(str.data(), str.size());
}
});
throw std::runtime_error("webrtc over tcp change poller: " + getPoller()->getThreadName() + " -> " + sock->getPoller()->getThreadName());
}
transport->setSession(shared_from_this());
_transport = std::move(transport);
InfoP(this);

View File

@ -17,6 +17,10 @@
#include "WebRtcTransport.h"
#include "Http/HttpRequestSplitter.h"
namespace toolkit {
class TcpServer;
}
namespace mediakit {
class WebRtcSession : public Session, public HttpRequestSplitter {
@ -24,6 +28,7 @@ public:
WebRtcSession(const Socket::Ptr &sock);
~WebRtcSession() override;
void attachServer(const Server &server) override;
void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override;
void onManager() override;
@ -41,6 +46,7 @@ private:
bool _find_transport = true;
Ticker _ticker;
struct sockaddr_storage _peer_addr;
std::weak_ptr<toolkit::TcpServer> _server;
std::shared_ptr<WebRtcTransportImp> _transport;
};