diff --git a/api/include/mk_events.h b/api/include/mk_events.h index 7e18263b..8f433a2a 100644 --- a/api/include/mk_events.h +++ b/api/include/mk_events.h @@ -177,6 +177,33 @@ typedef struct { */ void(API_CALL *on_mk_media_send_rtp_stop)(const char *vhost, const char *app, const char *stream, const char *ssrc, int err, const char *msg); + /** + * rtc sctp连接中/完成/失败/关闭回调 + * @param rtc_transport 数据通道对象 + */ + void(API_CALL *on_mk_rtc_sctp_connecting)(mk_rtc_transport rtc_transport); + void(API_CALL *on_mk_rtc_sctp_connected)(mk_rtc_transport rtc_transport); + void(API_CALL *on_mk_rtc_sctp_failed)(mk_rtc_transport rtc_transport); + void(API_CALL *on_mk_rtc_sctp_closed)(mk_rtc_transport rtc_transport); + + /** + * rtc数据通道发送数据回调 + * @param rtc_transport 数据通道对象 + * @param msg 数据 + * @param len 数据长度 + */ + void(API_CALL *on_mk_rtc_sctp_send)(mk_rtc_transport rtc_transport, const uint8_t *msg, size_t len); + + /** + * rtc数据通道接收数据回调 + * @param rtc_transport 数据通道对象 + * @param streamId 流id + * @param ppid 协议id + * @param msg 数据 + * @param len 数据长度 + */ + void(API_CALL *on_mk_rtc_sctp_received)(mk_rtc_transport rtc_transport, uint16_t streamId, uint32_t ppid, const uint8_t *msg, size_t len); + } mk_events; diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index ad039b5c..d44ed666 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -352,6 +352,20 @@ API_EXPORT mk_auth_invoker API_CALL mk_auth_invoker_clone(const mk_auth_invoker */ API_EXPORT void API_CALL mk_auth_invoker_clone_release(const mk_auth_invoker ctx); +///////////////////////////////////////////WebRtcTransport///////////////////////////////////////////// +//WebRtcTransport对象的C映射 +typedef struct mk_rtc_transport_t *mk_rtc_transport; + +/** + * 发送rtc数据通道 + * @param ctx 数据通道对象 + * @param streamId 流id + * @param ppid 协议id + * @param msg 数据 + * @param len 数据长度 + */ +API_EXPORT void API_CALL mk_rtc_send_datachannel(const mk_rtc_transport ctx, uint16_t streamId, uint32_t ppid, const char* msg, size_t len); + #ifdef __cplusplus } #endif diff --git a/api/source/mk_events.cpp b/api/source/mk_events.cpp index ae454e60..52ac6e57 100644 --- a/api/source/mk_events.cpp +++ b/api/source/mk_events.cpp @@ -14,6 +14,7 @@ #include "Http/HttpSession.h" #include "Rtsp/RtspSession.h" #include "Record/MP4Recorder.h" +#include "webrtc/WebRtcTransport.h" using namespace toolkit; using namespace mediakit; @@ -167,6 +168,42 @@ API_EXPORT void API_CALL mk_events_listen(const mk_events *events){ sender.getMediaTuple().stream.c_str(), ssrc.c_str(), ex.getErrCode(), ex.what()); } }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpConnecting,[](BroadcastRtcSctpConnectArgs){ + if (s_events.on_mk_rtc_sctp_connecting) { + s_events.on_mk_rtc_sctp_connecting((mk_rtc_transport)&sender); + } + }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpConnected,[](BroadcastRtcSctpConnectArgs){ + if (s_events.on_mk_rtc_sctp_connected) { + s_events.on_mk_rtc_sctp_connected((mk_rtc_transport)&sender); + } + }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpFailed,[](BroadcastRtcSctpConnectArgs){ + if (s_events.on_mk_rtc_sctp_failed) { + s_events.on_mk_rtc_sctp_failed((mk_rtc_transport)&sender); + } + }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpClosed,[](BroadcastRtcSctpConnectArgs){ + if (s_events.on_mk_rtc_sctp_closed) { + s_events.on_mk_rtc_sctp_closed((mk_rtc_transport)&sender); + } + }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpSend,[](BroadcastRtcSctpSendArgs){ + if (s_events.on_mk_rtc_sctp_send) { + s_events.on_mk_rtc_sctp_send((mk_rtc_transport)&sender, data, len); + } + }); + + NoticeCenter::Instance().addListener(&s_tag, Broadcast::kBroadcastRtcSctpReceived,[](BroadcastRtcSctpReceivedArgs){ + if (s_events.on_mk_rtc_sctp_received) { + s_events.on_mk_rtc_sctp_received((mk_rtc_transport)&sender, streamId, ppid, msg, len); + } + }); }); } diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 1d74e0e1..89b8c659 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -17,6 +17,7 @@ #include "Http/HttpClient.h" #include "Rtsp/RtspSession.h" +#include "webrtc/WebRtcTransport.h" using namespace toolkit; using namespace mediakit; @@ -497,4 +498,22 @@ API_EXPORT void API_CALL mk_auth_invoker_clone_release(const mk_auth_invoker ctx assert(ctx); Broadcast::AuthInvoker *invoker = (Broadcast::AuthInvoker *)ctx; delete invoker; -} \ No newline at end of file +} + +///////////////////////////////////////////WebRtcTransport///////////////////////////////////////////// +API_EXPORT void API_CALL mk_rtc_sendDatachannel(const mk_rtc_transport ctx, uint16_t streamId, uint32_t ppid, const char *msg, size_t len) { +#ifdef ENABLE_WEBRTC + assert(ctx && msg); + WebRtcTransport *transport = (WebRtcTransport *)ctx; + std::string msg_str(msg, len); + std::weak_ptr weak_trans = transport->shared_from_this(); + transport->getPoller()->async([streamId, ppid, msg_str, weak_trans]() { + // 切换线程后再操作 + if (auto trans = weak_trans.lock()) { + trans->sendDatachannel(streamId, ppid, msg_str.c_str(), msg_str.size()); + } + }); +#else + WarnL << "未启用webrtc功能, 编译时请开启ENABLE_WEBRTC"; +#endif +} diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 9a31b3e6..f31546fe 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -58,6 +58,12 @@ const string kBroadcastStreamNoneReader = "kBroadcastStreamNoneReader"; const string kBroadcastHttpBeforeAccess = "kBroadcastHttpBeforeAccess"; const string kBroadcastSendRtpStopped = "kBroadcastSendRtpStopped"; const string kBroadcastRtpServerTimeout = "kBroadcastRtpServerTimeout"; +const string kBroadcastRtcSctpConnecting = "kBroadcastRtcSctpConnecting"; +const string kBroadcastRtcSctpConnected = "kBroadcastRtcSctpConnected"; +const string kBroadcastRtcSctpFailed = "kBroadcastRtcSctpFailed"; +const string kBroadcastRtcSctpClosed = "kBroadcastRtcSctpClosed"; +const string kBroadcastRtcSctpSend = "kBroadcastRtcSctpSend"; +const string kBroadcastRtcSctpReceived = "kBroadcastRtcSctpReceived"; } // namespace Broadcast diff --git a/src/Common/config.h b/src/Common/config.h index cd335ef4..3196654b 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -109,6 +109,21 @@ extern const std::string kBroadcastReloadConfig; extern const std::string kBroadcastRtpServerTimeout; #define BroadcastRtpServerTimeoutArgs uint16_t &local_port, const string &stream_id,int &tcp_mode, bool &re_use_port, uint32_t &ssrc +// rtc transport sctp 连接状态 +extern const std::string kBroadcastRtcSctpConnecting; +extern const std::string kBroadcastRtcSctpConnected; +extern const std::string kBroadcastRtcSctpFailed; +extern const std::string kBroadcastRtcSctpClosed; +#define BroadcastRtcSctpConnectArgs WebRtcTransport& sender + +// rtc transport sctp 发送数据 +extern const std::string kBroadcastRtcSctpSend; +#define BroadcastRtcSctpSendArgs WebRtcTransport& sender, const uint8_t *&data, size_t& len + +// rtc transport sctp 接收数据 +extern const std::string kBroadcastRtcSctpReceived; +#define BroadcastRtcSctpReceivedArgs WebRtcTransport& sender, uint16_t &streamId, uint32_t &ppid, const uint8_t *&msg, size_t &len + #define ReloadConfigTag ((void *)(0xFF)) #define RELOAD_KEY(arg, key) \ do { \ diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 7a39b8da..14bcf19e 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -55,6 +55,9 @@ const string kStartBitrate = RTC_FIELD "start_bitrate"; const string kMaxBitrate = RTC_FIELD "max_bitrate"; const string kMinBitrate = RTC_FIELD "min_bitrate"; +// 数据通道设置 +const string kDataChannelEcho = RTC_FIELD "datachannel_echo"; + static onceToken token([]() { mINI::Instance()[kTimeOutSec] = 15; mINI::Instance()[kExternIP] = ""; @@ -65,6 +68,8 @@ static onceToken token([]() { mINI::Instance()[kStartBitrate] = 0; mINI::Instance()[kMaxBitrate] = 0; mINI::Instance()[kMinBitrate] = 0; + + mINI::Instance()[kDataChannelEcho] = true; }); } // namespace RTC @@ -250,22 +255,47 @@ void WebRtcTransport::OnDtlsTransportApplicationDataReceived( #ifdef ENABLE_SCTP void WebRtcTransport::OnSctpAssociationConnecting(RTC::SctpAssociation *sctpAssociation) { TraceL << getIdentifier(); + try { + NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpConnecting, *this); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } } void WebRtcTransport::OnSctpAssociationConnected(RTC::SctpAssociation *sctpAssociation) { InfoL << getIdentifier(); + try { + NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpConnected, *this); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } } void WebRtcTransport::OnSctpAssociationFailed(RTC::SctpAssociation *sctpAssociation) { WarnL << getIdentifier(); + try { + NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpFailed, *this); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } } void WebRtcTransport::OnSctpAssociationClosed(RTC::SctpAssociation *sctpAssociation) { InfoL << getIdentifier(); + try { + NOTICE_EMIT(BroadcastRtcSctpConnectArgs, Broadcast::kBroadcastRtcSctpClosed, *this); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } } void WebRtcTransport::OnSctpAssociationSendData( RTC::SctpAssociation *sctpAssociation, const uint8_t *data, size_t len) { + try { + NOTICE_EMIT(BroadcastRtcSctpSendArgs, Broadcast::kBroadcastRtcSctpSend, *this, data, len); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } _dtls_transport->SendApplicationData(data, len); } @@ -274,8 +304,18 @@ void WebRtcTransport::OnSctpAssociationMessageReceived( InfoL << getIdentifier() << " " << streamId << " " << ppid << " " << len << " " << string((char *)msg, len); RTC::SctpStreamParameters params; params.streamId = streamId; - // 回显数据 - _sctp->SendSctpMessage(params, ppid, msg, len); + + GET_CONFIG(bool, datachannel_echo, Rtc::kDataChannelEcho); + if (datachannel_echo) { + // 回显数据 + _sctp->SendSctpMessage(params, ppid, msg, len); + } + + try { + NOTICE_EMIT(BroadcastRtcSctpReceivedArgs, Broadcast::kBroadcastRtcSctpReceived, *this, streamId, ppid, msg, len); + } catch (std::exception &ex) { + WarnL << "Exception occurred: " << ex.what(); + } } #endif