/* * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. * * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). * * Use of this source code is governed by MIT-like license that can be found in the * LICENSE file in the root of the source tree. All contributing project authors * may be found in the AUTHORS file in the root of the source tree. */ #include "WebRtcSignalingPeer.h" #include "WebRtcSignalingMsg.h" #include "Util/util.h" #include "Common/config.h" #include "json/value.h" using namespace std; using namespace toolkit; using namespace mediakit::Rtc; namespace mediakit { // 注册到的信令服务器列表 // 不允许注册到同一个服务器地址 static ServiceController s_room_keepers; static inline string getRoomKeepersKey(const string &host, uint16_t &port) { return host + ":" + std::to_string(port); } void addWebrtcRoomKeeper(const string &host, uint16_t port, const std::string& room_id, bool ssl, const function &cb) { DebugL; auto key = getRoomKeepersKey(host, port); if (s_room_keepers.find(key)) { //已经发起注册了 cb(SockException(Err_success), key); return; } auto peer = s_room_keepers.make(key, host, port, ssl, room_id); peer->setOnShutdown([key] (const SockException &ex) { InfoL << "webrtc peer shutdown, key: " << key << ", " << ex.what(); s_room_keepers.erase(key); }); peer->setOnConnect([peer, cb] (const SockException &ex) { peer->regist(cb); }); peer->connect(); } void delWebrtcRoomKeeper(const std::string &key, const std::function &cb) { auto peer = s_room_keepers.find(key); if (!peer) { return cb(SockException(Err_other, "room_key not exist")); } peer->unregist(cb); s_room_keepers.erase(key); } void listWebrtcRoomKeepers(const std::function &cb) { s_room_keepers.for_each(cb); } Json::Value ToJson(const WebRtcSignalingPeer::Ptr& p) { return p->makeInfoJson(); } WebRtcSignalingPeer::Ptr getWebrtcRoomKeeper(const string &host, uint16_t port) { return s_room_keepers.find(getRoomKeepersKey(host, port)); } //////////// WebRtcSignalingPeer ////////////////////////// WebRtcSignalingPeer::WebRtcSignalingPeer(const std::string &host, uint16_t port, bool ssl, const std::string &room_id, const EventPoller::Ptr &poller) : WebSocketClient(poller) , _room_id(room_id) { TraceL; // TODO: not support wss now _ws_url = StrPrinter << (ssl ? "wss://" : "ws://") + host << ":" << port << "/signaling"; _room_key = getRoomKeepersKey(host, port); } WebRtcSignalingPeer::~WebRtcSignalingPeer() { DebugL << "room_id: " << _room_id; } void WebRtcSignalingPeer::connect() { DebugL; startWebSocket(_ws_url); } void WebRtcSignalingPeer::regist(const function &cb) { DebugL; std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); getPoller()->async([weak_self, cb]() mutable { if (auto strong_self = weak_self.lock()) { strong_self->sendRegisterRequest(std::move(cb)); } }); } void WebRtcSignalingPeer::unregist(const function &cb) { DebugL; auto trigger = [cb](const SockException &ex, std::string msg) { cb(ex); }; std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); getPoller()->async([weak_self, trigger]() mutable { if (auto strong_self = weak_self.lock()) { strong_self->sendUnregisterRequest(std::move(trigger)); } }); } void WebRtcSignalingPeer::checkIn(const std::string& peer_room_id, const MediaTuple &tuple, const std::string& identifier, const std::string& offer, bool is_play, const function &cb, float timeout_sec) { DebugL; std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); getPoller()->async([=] () mutable { TraceL; if (auto strong_self = weak_self.lock()) { auto guest_id = strong_self->_room_id + "_" + makeRandStr(16); strong_self->_tours.emplace(peer_room_id, std::make_pair(guest_id, identifier)); auto trigger = ([cb, peer_room_id, weak_self](const SockException &ex, const std::string &msg) { auto strong_self = weak_self.lock(); if (ex && strong_self) { strong_self->_tours.erase(peer_room_id); } return cb(ex, msg); }); strong_self->sendCallRequest(peer_room_id, guest_id, tuple, offer, is_play, std::move(trigger)); } }); } void WebRtcSignalingPeer::checkOut(const std::string& peer_room_id) { DebugL; std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); getPoller()->async([=] () { TraceL; if (auto strong_self = weak_self.lock()) { auto it = strong_self->_tours.find(peer_room_id); if (it != strong_self->_tours.end()) { auto &guest_id = it->second.first; strong_self->sendByeIndication(peer_room_id, guest_id); strong_self->_tours.erase(it); } } }); } void WebRtcSignalingPeer::candidate(const std::string& transport_identifier, const std::string& candidate, const std::string& ice_ufrag, const std::string& ice_pwd) { std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); getPoller()->async([=] () { if (auto strong_self = weak_self.lock()) { strong_self->sendCandidateIndication(transport_identifier, candidate, ice_ufrag, ice_pwd); } }); } void WebRtcSignalingPeer::processOffer(SIGNALING_MSG_ARGS, WebRtcInterface &transport) { try { auto sdp = transport.getAnswerSdp((const std::string )allArgs[SDP_KEY]); auto tuple = MediaTuple(allArgs[CALL_VHOST_KEY], allArgs[CALL_APP_KEY], allArgs[CALL_STREAM_KEY]); answer(allArgs[GUEST_ID_KEY], tuple, transport.getIdentifier(), sdp, allArgs[TYPE_KEY] == TYPE_VALUE_PLAY, allArgs[TRANSACTION_ID_KEY]); std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); transport.gatheringCandidate(_ice_server, [weak_self](const std::string& transport_identifier, const std::string& candidate, const std::string& ufrag, const std::string& pwd) { if (auto strong_self = weak_self.lock()) { strong_self->candidate(transport_identifier, candidate, ufrag, pwd); } }); } catch (std::exception &ex) { Json::Value body; body[METHOD_KEY] = allArgs[METHOD_KEY]; body[ROOM_ID_KEY] = allArgs[ROOM_ID_KEY]; body[GUEST_ID_KEY] = allArgs[GUEST_ID_KEY]; body[CALL_VHOST_KEY] = allArgs[CALL_VHOST_KEY]; body[CALL_APP_KEY] = allArgs[CALL_APP_KEY]; body[CALL_STREAM_KEY] = allArgs[CALL_STREAM_KEY]; body[TYPE_KEY] = allArgs[TYPE_KEY]; sendRefusesResponse(body, allArgs[TRANSACTION_ID_KEY], ex.what()); } } void WebRtcSignalingPeer::answer(const std::string& guest_id, const MediaTuple &tuple, const std::string& identifier, const std::string& sdp, bool is_play, const std::string& transaction_id) { _peer_guests.emplace(guest_id, identifier); sendCallAccept(guest_id, tuple, sdp, is_play, transaction_id); } void WebRtcSignalingPeer::setOnConnect(function cb) { _on_connect = cb ? std::move(cb) : [](const SockException &) {}; } void WebRtcSignalingPeer::onConnect(const SockException &ex) { TraceL; if (_on_connect) { _on_connect(ex); } if (!ex) { createResponseExpiredTimer(); } } void WebRtcSignalingPeer::setOnShutdown(function cb) { _on_shutdown = cb ? std::move(cb) : [](const SockException &) {}; } void WebRtcSignalingPeer::onShutdown(const SockException &ex) { TraceL; if (_on_shutdown) { _on_shutdown(ex); } } void WebRtcSignalingPeer::onRecv(const Buffer::Ptr &buffer) { TraceL << "recv msg:\r\n" << buffer->data(); Json::Value args; Json::Reader reader; reader.parse(buffer->data(), args); Parser parser; HttpAllArgs allArgs(parser, args); CHECK_ARGS(METHOD_KEY, TRANSACTION_ID_KEY); using MsgHandler = void (WebRtcSignalingPeer::*)(SIGNALING_MSG_ARGS); static std::unordered_map, MsgHandler, ClassMethodHash> s_msg_handlers; static onceToken token([]() { s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_REGISTER), &WebRtcSignalingPeer::handleRegisterAccept); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_REGISTER), &WebRtcSignalingPeer::handleRegisterReject); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_UNREGISTER), &WebRtcSignalingPeer::handleUnregisterAccept); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_UNREGISTER), &WebRtcSignalingPeer::handleUnregisterReject); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REQUEST, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallRequest); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallAccept); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallReject); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_INDICATION, METHOD_VALUE_CANDIDATE), &WebRtcSignalingPeer::handleCandidateIndication); s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_INDICATION, METHOD_VALUE_BYE), &WebRtcSignalingPeer::handleByeIndication); }); auto it = s_msg_handlers.find(std::make_pair(allArgs[CLASS_KEY], allArgs[METHOD_KEY])); if (it == s_msg_handlers.end()) { WarnL << "unsupported class: "<< allArgs[CLASS_KEY] << ", method: " << allArgs[METHOD_KEY] << ", ignore"; return; } return (this->*(it->second))(allArgs); } void WebRtcSignalingPeer::onError(const SockException &err) { WarnL << "room_id: " << _room_id; s_room_keepers.erase(_room_key); // 除非对端显式的发送了注销执行,否则因为网络异常导致的会话中断,不影响已经进行通信的webrtc会话,仅作移除 } bool WebRtcSignalingPeer::responseFilter(SIGNALING_MSG_ARGS, ResponseTrigger& trigger) { if (allArgs[CLASS_KEY] != CLASS_VALUE_ACCEPT && allArgs[CLASS_KEY] != CLASS_VALUE_REJECT) { return false; } for (auto &pr : _response_list) { auto &transaction_id = pr.first; // mismatch transaction_id if (transaction_id != allArgs[TRANSACTION_ID_KEY] && !transaction_id.empty()) { continue; } auto &handle = pr.second; if (allArgs[METHOD_KEY] != handle.method) { WarnL << "recv response method: " << allArgs[METHOD_KEY] << " mismatch request method: " << handle.method; return false; } trigger = std::move(handle.cb); _response_list.erase(transaction_id); return true; } return false; } void WebRtcSignalingPeer::sendRegisterRequest(ResponseTrigger trigger) { TraceL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_REQUEST; body[METHOD_KEY] = METHOD_VALUE_REGISTER; body[ROOM_ID_KEY] = getRoomId(); sendRequest(body, std::move(trigger)); } void WebRtcSignalingPeer::handleRegisterAccept(SIGNALING_MSG_ARGS) { TraceL; ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } auto jsonArgs = allArgs.getArgs(); auto ice_servers = jsonArgs[ICE_SERVERS_KEY]; if (ice_servers.type() != Json::ValueType::arrayValue) { _StrPrinter msg; msg << "illegal \"" << ICE_SERVERS_KEY << "\" point"; WarnL << msg; trigger(SockException(Err_other, msg), getRoomKey()); return; } if (ice_servers.empty()) { _StrPrinter msg; msg << "no ice server found in \"" << ICE_SERVERS_KEY << "\" point"; WarnL << msg; trigger(SockException(Err_other, msg), getRoomKey()); return; } for (auto &ice_server : ice_servers) { // only support 1 ice_server now auto url = ice_server[URL_KEY].asString(); _ice_server = std::make_shared(url); _ice_server->_ufrag = ice_server[UFRAG_KEY].asString(); _ice_server->_pwd = ice_server[PWD_KEY].asString(); } trigger(SockException(Err_success), getRoomKey()); } void WebRtcSignalingPeer::handleRegisterReject(SIGNALING_MSG_ARGS) { TraceL; ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } auto ex = SockException(Err_other, StrPrinter << "register refuses by server, reason: " << allArgs[REASON_KEY]); trigger(ex, getRoomKey()); onShutdown(ex); } void WebRtcSignalingPeer::sendUnregisterRequest(ResponseTrigger trigger) { TraceL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_REQUEST; body[METHOD_KEY] = METHOD_VALUE_UNREGISTER; body[ROOM_ID_KEY] = _room_id; sendRequest(body, std::move(trigger)); } void WebRtcSignalingPeer::handleUnregisterAccept(SIGNALING_MSG_ARGS) { ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } trigger(SockException(Err_success), getRoomKey()); } void WebRtcSignalingPeer::handleUnregisterReject(SIGNALING_MSG_ARGS) { ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } auto ex = SockException(Err_other, StrPrinter << "unregister refuses by server, reason: " << allArgs[REASON_KEY]); trigger(ex, getRoomKey()); } void WebRtcSignalingPeer::sendCallRequest(const std::string& peer_room_id, const std::string& guest_id, const MediaTuple &tuple, const std::string& sdp, bool is_play, ResponseTrigger trigger) { DebugL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_REQUEST; body[METHOD_KEY] = METHOD_VALUE_CALL; body[TYPE_KEY] = is_play? TYPE_VALUE_PLAY : TYPE_VALUE_PUSH; body[GUEST_ID_KEY] = guest_id; //our guest id body[ROOM_ID_KEY] = peer_room_id; body[CALL_VHOST_KEY] = tuple.vhost; body[CALL_APP_KEY] = tuple.app; body[CALL_STREAM_KEY] = tuple.stream; body[SDP_KEY] = sdp; sendRequest(body, std::move(trigger)); } void WebRtcSignalingPeer::sendCallAccept(const std::string& peer_guest_id, const MediaTuple &tuple, const std::string& sdp, bool is_play, const std::string& transaction_id) { DebugL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_ACCEPT; body[METHOD_KEY] = METHOD_VALUE_CALL; body[TRANSACTION_ID_KEY] = transaction_id; body[TYPE_KEY] = is_play? TYPE_VALUE_PLAY : TYPE_VALUE_PUSH; body[GUEST_ID_KEY] = peer_guest_id; body[ROOM_ID_KEY] = _room_id; //our room id body[CALL_VHOST_KEY] = tuple.vhost; body[CALL_APP_KEY] = tuple.app; body[CALL_STREAM_KEY] = tuple.stream; body[SDP_KEY] = sdp; sendPacket(body); } void WebRtcSignalingPeer::handleCallRequest(SIGNALING_MSG_ARGS) { DebugL; CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY); if (allArgs[ROOM_ID_KEY] != getRoomId()) { WarnL << "target room_id: " << allArgs[ROOM_ID_KEY] << "mismatch our room_id: " << getRoomId(); return; } auto args = std::make_shared>(allArgs, allArgs[GUEST_ID_KEY]); std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); WebRtcPluginManager::Instance().negotiateSdp(*this, allArgs[TYPE_KEY], *args, [allArgs, weak_self](const WebRtcInterface &exchanger) mutable { if (auto strong_self = weak_self.lock()) { strong_self->processOffer(allArgs, const_cast(exchanger)); } }); } void WebRtcSignalingPeer::handleCallAccept(SIGNALING_MSG_ARGS) { DebugL; ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY); auto room_id = allArgs[ROOM_ID_KEY]; auto it = _tours.find(room_id); if (it == _tours.end()) { WarnL << "not found room_id: " << room_id << " in tours"; return; } auto &guest_id = it->second.first; if (allArgs[GUEST_ID_KEY] != guest_id) { WarnL << "guest_id: " << allArgs[GUEST_ID_KEY] << "mismatch our guest_id: " << guest_id; return; } trigger(SockException(Err_success), allArgs[SDP_KEY]); } void WebRtcSignalingPeer::handleCallReject(SIGNALING_MSG_ARGS) { DebugL; ResponseTrigger trigger; if (!responseFilter(allArgs, trigger)) { return; } CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY); auto room_id = allArgs[ROOM_ID_KEY]; auto it = _tours.find(room_id); if (it == _tours.end()) { WarnL << "not found room_id: " << room_id << " in tours"; return; } auto &guest_id = it->second.first; if (allArgs[GUEST_ID_KEY] != guest_id) { WarnL << "guest_id: " << allArgs[GUEST_ID_KEY] << "mismatch our guest_id: " << guest_id; return; } _tours.erase(room_id); trigger(SockException(Err_other, StrPrinter << "call refuses by server, reason: " << allArgs[REASON_KEY]), ""); } void WebRtcSignalingPeer::handleCandidateIndication(SIGNALING_MSG_ARGS) { DebugL; CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CANDIDATE_KEY, UFRAG_KEY, PWD_KEY); std::string identifier; std::string room_id = allArgs[ROOM_ID_KEY]; std::string guest_id = allArgs[GUEST_ID_KEY]; //作为被叫 if (room_id == getRoomId()) { auto it = _peer_guests.find(guest_id); if (it == _peer_guests.end()) { WarnL << "not found guest_id: " << guest_id; return; } identifier = it->second; } else { //作为主叫 for (auto it : _tours) { if (room_id != it.first) { continue; } auto info = it.second; if (guest_id != info.first) { break; } identifier = info.second; } } TraceL << "recv remote candidate: " << allArgs[CANDIDATE_KEY]; if (identifier.empty()) { WarnL << "target room_id: " << room_id << " not match our room_id: " << getRoomId() << ", and target guest_id: " << guest_id << " not match"; return; } auto transport = WebRtcTransportManager::Instance().getItem(identifier); if (transport) { SdpAttrCandidate candidate_attr; candidate_attr.parse(allArgs[CANDIDATE_KEY]); transport->connectivityCheck(candidate_attr, allArgs[UFRAG_KEY], allArgs[PWD_KEY]); } } void WebRtcSignalingPeer::handleByeIndication(SIGNALING_MSG_ARGS) { DebugL; CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY); if (allArgs[ROOM_ID_KEY] != getRoomId()) { WarnL << "target room_id: " << allArgs[ROOM_ID_KEY] << "not match our room_id: " << getRoomId(); return; } auto it = _peer_guests.find(allArgs[GUEST_ID_KEY]); if (it == _peer_guests.end()) { WarnL << "not found guest_id: " << allArgs[GUEST_ID_KEY]; return; } auto identifier = it->second; _peer_guests.erase(it); auto obj = WebRtcTransportManager::Instance().getItem(identifier); if (obj) { obj->safeShutdown(SockException(Err_shutdown, "deleted by websocket signaling server")); } } void WebRtcSignalingPeer::sendByeIndication(const std::string& peer_room_id, const std::string &guest_id) { DebugL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_INDICATION; body[METHOD_KEY] = METHOD_VALUE_BYE; body[GUEST_ID_KEY] = guest_id; //our guest id body[ROOM_ID_KEY] = peer_room_id; body[SENDER_KEY] = guest_id; sendIndication(body); } void WebRtcSignalingPeer::sendCandidateIndication(const std::string& transport_identifier, const std::string& candidate, const std::string& ice_ufrag, const std::string& ice_pwd) { TraceL; Json::Value body; body[CLASS_KEY] = CLASS_VALUE_INDICATION; body[METHOD_KEY] = METHOD_VALUE_CANDIDATE; body[CANDIDATE_KEY] = candidate; body[UFRAG_KEY] = ice_ufrag; body[PWD_KEY] = ice_pwd; //作为被叫 for (auto &pr : _peer_guests) { if (pr.second == transport_identifier) { body[ROOM_ID_KEY] = _room_id; body[GUEST_ID_KEY] = pr.first; //peer_guest_id body[SENDER_KEY] = _room_id; return sendIndication(body); } } //作为主叫 for (auto &pr : _tours) { auto &info = pr.second; if (info.second == transport_identifier) { body[ROOM_ID_KEY] = pr.first; //peer room id body[GUEST_ID_KEY] = info.first; //our_guest_id body[SENDER_KEY] = info.first; return sendIndication(body); } } } void WebRtcSignalingPeer::sendAcceptResponse(const std::string& method, const std::string& transaction_id, const std::string& room_id, const std::string& guest_id, const std::string& reason) { // TODO } void WebRtcSignalingPeer::sendRefusesResponse(Json::Value &body, const std::string& transaction_id, const std::string& reason) { body[CLASS_KEY] = CLASS_VALUE_REJECT; body[REASON_KEY] = reason; sendResponse(body, transaction_id); } void WebRtcSignalingPeer::sendRequest(Json::Value& body, ResponseTrigger trigger, float seconds) { auto transaction_id = makeRandStr(32); body[TRANSACTION_ID_KEY] = transaction_id; ResponseTuple tuple; tuple.ttl_ms = seconds * 1000; tuple.method = body[METHOD_KEY].asString(); tuple.cb = std::move(trigger); _response_list.emplace(std::move(transaction_id), std::move(tuple)); sendPacket(body); } void WebRtcSignalingPeer::sendIndication(Json::Value &body) { auto transaction_id = makeRandStr(32); body[TRANSACTION_ID_KEY] = transaction_id; sendPacket(body); } void WebRtcSignalingPeer::sendResponse(Json::Value &body, const std::string& transaction_id) { body[TRANSACTION_ID_KEY] = transaction_id; sendPacket(body); } void WebRtcSignalingPeer::sendPacket(Json::Value& body) { auto msg = body.toStyledString(); DebugL << "send msg: " << msg; SockSender::send(std::move(msg)); } Json::Value WebRtcSignalingPeer::makeInfoJson() { Json::Value item; item["room_id"] = getRoomId(); item["room_key"] = getRoomKey(); Json::Value peer_guests_obj(Json::arrayValue); auto peer_guests = _peer_guests; for(auto &guest : peer_guests) { Json::Value obj; obj["guest_id"] = guest.first; obj["transport_identifier"] = guest.second; peer_guests_obj.append(std::move(obj)); } item["guests"] = std::move(peer_guests_obj); Json::Value tours_obj(Json::arrayValue); auto tours = _tours; for(auto &tour : tours){ Json::Value obj; obj["room_id"] = tour.first; obj["guest_id"] = tour.second.first; obj["transport_identifier"] = tour.second.second; tours_obj.append(std::move(obj)); } item["tours"] = std::move(tours_obj); return item; } void WebRtcSignalingPeer::createResponseExpiredTimer() { std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); _expire_timer = std::make_shared(0.2, [weak_self]() { if (auto strong_self = weak_self.lock()) { strong_self->checkResponseExpired(); return true; // 继续定时器 } return false; }, getPoller()); } void WebRtcSignalingPeer::checkResponseExpired() { //FIXME: 移动到专门的超时timer中处理 #if 0 // 设置计时器以检测 offer 响应超时 _offer_timeout_timer = std::make_shared( timeout_sec, [this, cb, peer_room_id]() { _tours.erase(peer_room_id); return false; // 停止计时器 }, getPoller() ); #endif for (auto it = _response_list.begin(); it != _response_list.end();) { auto &tuple = it->second; if (!tuple.expired()) { ++it; continue; } // over time WarnL << "transaction_id: " << it->first << ", method: " << tuple.method << " recv response over time"; tuple.cb(SockException(Err_timeout, "recv response timeout"), ""); it = _response_list.erase(it); } } }// namespace mediakit