完善webrtc over tcp

This commit is contained in:
ziyue 2022-11-18 16:36:31 +08:00
parent 6ce6b63a2c
commit afde1ec9bd
6 changed files with 82 additions and 99 deletions

View File

@ -277,19 +277,20 @@ int start_main(int argc,char *argv[]) {
#endif//defined(ENABLE_RTPPROXY)
#if defined(ENABLE_WEBRTC)
auto rtcSrv_tcp = std::make_shared<TcpServer>();
//webrtc udp服务器
auto rtcSrv = std::make_shared<TcpServer>();
// rtcSrv->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
// if (!buf) {
// return Socket::createSocket(poller, false);
// }
// auto new_poller = WebRtcSession::queryPoller(buf);
// if (!new_poller) {
// //该数据对应的webrtc对象未找到丢弃之
// return Socket::Ptr();
// }
// return Socket::createSocket(new_poller, false);
// });
auto rtcSrv_udp = std::make_shared<UdpServer>();
rtcSrv_udp->setOnCreateSocket([](const EventPoller::Ptr &poller, const Buffer::Ptr &buf, struct sockaddr *, int) {
if (!buf) {
return Socket::createSocket(poller, false);
}
auto new_poller = WebRtcSession::queryPoller(buf);
if (!new_poller) {
//该数据对应的webrtc对象未找到丢弃之
return Socket::Ptr();
}
return Socket::createSocket(new_poller, false);
});
uint16_t rtcPort = mINI::Instance()[Rtc::kPort];
#endif//defined(ENABLE_WEBRTC)
@ -337,7 +338,7 @@ int start_main(int argc,char *argv[]) {
#if defined(ENABLE_WEBRTC)
//webrtc udp服务器
if (rtcPort) { rtcSrv->start<WebRtcSession>(rtcPort); }
if (rtcPort) { rtcSrv_udp->start<WebRtcSession>(rtcPort); rtcSrv_tcp->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC)
#if defined(ENABLE_SRT)

View File

@ -340,8 +340,7 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
}
void MultiMediaSourceMuxer::onAllTrackReady() {
//TODO fix this poller is not current thread
//CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread());
CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread());
setMediaListener(getDelegate());
if (_rtmp) {

View File

@ -15,9 +15,7 @@ using namespace std;
namespace mediakit {
static string getUserName(const Buffer::Ptr &buffer) {
auto buf = buffer->data() + 2;
auto len = buffer->size() - 2;
static string getUserName(const char *buf, size_t len) {
if (!RTC::StunPacket::IsStun((const uint8_t *) buf, len)) {
return "";
}
@ -35,7 +33,7 @@ static string getUserName(const Buffer::Ptr &buffer) {
}
EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
auto user_name = getUserName(buffer);
auto user_name = getUserName(buffer->data(), buffer->size());
if (user_name.empty()) {
return nullptr;
}
@ -45,86 +43,42 @@ EventPoller::Ptr WebRtcSession::queryPoller(const Buffer::Ptr &buffer) {
////////////////////////////////////////////////////////////////////////////////
WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : TcpSession(sock) {
WebRtcSession::WebRtcSession(const Socket::Ptr &sock) : Session(sock) {
socklen_t addr_len = sizeof(_peer_addr);
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
_over_tcp = sock->sockType() == SockNum::Sock_TCP;
}
WebRtcSession::~WebRtcSession() {
InfoP(this);
}
/*
* Framing RFC 4571
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* ---------------------------------------------------------------
* | LENGTH | RTP or RTCP packet ... |
* ---------------------------------------------------------------
* The bit field definition of the framing method
* A 16-bit unsigned integer LENGTH field, coded in network byte order
* (big-endian), begins the frame. If LENGTH is non-zero, an RTP or
* RTCP packet follows the LENGTH field. The value coded in the LENGTH
* field MUST equal the number of octets in the RTP or RTCP packet.
* Zero is a valid value for LENGTH, and it codes the null packet.
*/
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
//只允许寻找一次transport
if (!_transport) {
auto user_name = getUserName(buffer);
void WebRtcSession::onRecv_l(const char *data, size_t len) {
if (_find_transport) {
// 只允许寻找一次transport
_find_transport = false;
auto user_name = getUserName(data, len);
auto transport = WebRtcTransportManager::Instance().getItem(user_name);
//TODO fix this poller is not current thread
//CHECK(transport && transport->getPoller()->isCurrentThread());
CHECK(transport && transport->getPoller()->isCurrentThread());
transport->setSession(shared_from_this());
_transport = std::move(transport);
InfoP(this);
}
_ticker.resetTime();
CHECK(_transport);
//_transport->inputSockData(buffer->data() + 2, buffer->size() - 2, (struct sockaddr *)&_peer_addr);
_transport->inputSockData((char *)data, len, (struct sockaddr *)&_peer_addr);
}
//一个tcp数据包里面可能会有多帧
uint8_t* buf = reinterpret_cast<uint8_t *>(buffer->data());
size_t buf_size = buffer->size();
size_t frame_start = 0;
size_t remian_len = 0;
size_t frame_size = 0;
for (;;) {
remian_len = buf_size - frame_start;
if(remian_len >= 2){
frame_size = size_t { Utils::Byte::Get2Bytes(buf + frame_start, 0) };
}
//解析出来了一帧tcp frame
if (remian_len >= 2 && remian_len >= 2 + frame_size) {
const uint8_t *frame = buf + frame_start + 2;
if(frame_size != 0){
_transport->inputSockData((char *)frame, frame_size, (struct sockaddr *)&_peer_addr);
}
//数据全部解析完毕
if((frame_start + 2 + frame_size) == buf_size){
break;
}
//更新解析buf的起始位置
else{
frame_start += (2 + frame_size);
}
//还有数据 需要继续解析
if (buf_size > frame_start) {
continue;
}
break;
}
//包解析出错了 丢弃
else{
WarnL<<"Incomplete packet";
break;
}
void WebRtcSession::onRecv(const Buffer::Ptr &buffer) {
if (_over_tcp) {
input(buffer->data(), buffer->size());
} else {
onRecv_l(buffer->data(), buffer->size());
}
}
void WebRtcSession::onError(const SockException &err) {
//udp链接超时但是rtc链接不一定超时因为可能存在udp链接迁移的情况
//udp链接超时但是rtc链接不一定超时因为可能存在链接迁移的情况
//在udp链接迁移时新的WebRtcSession对象将接管WebRtcTransport对象的生命周期
//本WebRtcSession对象将在超时后自动销毁
WarnP(this) << err.what();
@ -150,6 +104,25 @@ void WebRtcSession::onManager() {
}
}
ssize_t WebRtcSession::onRecvHeader(const char *data, size_t len) {
onRecv_l(data + 2, len - 2);
return 0;
}
const char *WebRtcSession::onSearchPacketTail(const char *data, size_t len) {
if (len < 2) {
// 数据不够
return nullptr;
}
uint16_t length = (((uint8_t *)data)[0] << 8) | ((uint8_t *)data)[1];
if (len < (size_t)(length + 2)) {
// 数据不够
return nullptr;
}
// 返回rtp包末尾
return data + 2 + length;
}
}// namespace mediakit

View File

@ -15,10 +15,11 @@
#include "Network/Session.h"
#include "IceServer.hpp"
#include "WebRtcTransport.h"
#include "Http/HttpRequestSplitter.h"
namespace mediakit {
class WebRtcSession : public TcpSession {
class WebRtcSession : public Session, public HttpRequestSplitter {
public:
WebRtcSession(const Socket::Ptr &sock);
~WebRtcSession() override;
@ -26,11 +27,18 @@ public:
void onRecv(const Buffer::Ptr &) override;
void onError(const SockException &err) override;
void onManager() override;
//std::string getIdentifier() const override;
static EventPoller::Ptr queryPoller(const Buffer::Ptr &buffer);
private:
//// HttpRequestSplitter override ////
ssize_t onRecvHeader(const char *data, size_t len) override;
const char *onSearchPacketTail(const char *data, size_t len) override;
void onRecv_l(const char *data, size_t len);
private:
bool _over_tcp = false;
bool _find_transport = true;
Ticker _ticker;
struct sockaddr_storage _peer_addr;
std::shared_ptr<WebRtcTransportImp> _transport;

View File

@ -419,20 +419,20 @@ void WebRtcTransportImp::onSendSockData(Buffer::Ptr buf, bool flush, RTC::Transp
return;
}
//增加tcp两字节头
auto pkt = _packet_pool.obtain2();
int len = buf->size();
pkt->setCapacity(buf->size() + 2 + 1);
char tcp_len[2] = {0};
tcp_len[0] = ((int16_t)len >> 8)&0xff;
tcp_len[1] = (int16_t)len & 0xff;
pkt->assign2(tcp_len, 2);
pkt->assign2(buf->data(), len , 2);
pkt->setSize(len + 2);
// 一次性发送一帧的rtp数据提高网络io性能
_selected_session->setSendFlushFlag(flush);
_selected_session->send(std::move(pkt));
if (_selected_session->getSock()->sockType() == SockNum::Sock_TCP) {
// 增加tcp两字节头
auto len = buf->size();
char tcp_len[2] = { 0 };
tcp_len[0] = (len >> 8) & 0xff;
tcp_len[1] = len & 0xff;
_selected_session->SockSender::send(tcp_len, 2);
}
_selected_session->send(std::move(buf));
if (flush) {
_selected_session->flushAll();
}
}
///////////////////////////////////////////////////////////////////
@ -620,11 +620,13 @@ void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
});
if (extern_ips.empty()) {
std::string localIp = SockUtil::get_local_ip();
configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "tcp"));
configure.addCandidate(*makeIceCandidate(localIp, local_port, 120, "udp"));
configure.addCandidate(*makeIceCandidate(localIp, local_port, 110, "tcp"));
} else {
const uint32_t delta = 10;
uint32_t priority = 100 + delta * extern_ips.size();
for (auto ip : extern_ips) {
configure.addCandidate(*makeIceCandidate(ip, local_port, priority + 5, "udp"));
configure.addCandidate(*makeIceCandidate(ip, local_port, priority, "tcp"));
priority -= delta;
}
@ -1054,6 +1056,7 @@ void WebRtcTransportImp::setSession(Session::Ptr session) {
<< session->get_peer_port() << ", id:" << getIdentifier();
}
_selected_session = std::move(session);
_selected_session->setSendFlushFlag(false);
unrefSelf();
}

View File

@ -127,9 +127,6 @@ protected:
void OnDtlsTransportSendData(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) override;
void OnDtlsTransportApplicationDataReceived(const RTC::DtlsTransport *dtlsTransport, const uint8_t *data, size_t len) override;
// 循环池
ResourcePool<BufferRaw> _packet_pool;
protected:
//// ice相关的回调 ///
void OnIceServerSendStunPacket(const RTC::IceServer *iceServer, const RTC::StunPacket *packet, RTC::TransportTuple *tuple) override;
@ -182,6 +179,8 @@ private:
std::shared_ptr<RTC::SrtpSession> _srtp_session_send;
std::shared_ptr<RTC::SrtpSession> _srtp_session_recv;
Ticker _ticker;
// 循环池
ResourcePool<BufferRaw> _packet_pool;
#ifdef ENABLE_SCTP
RTC::SctpAssociationImp::Ptr _sctp;