// // TODO: add packet queue for player // #include "aio-rtmp-server.h" #include "aio-timeout.h" #include "aio-worker.h" #include "sys/sync.hpp" #include "flv-writer.h" #include "flv-proto.h" #include "flv-muxer.h" #include "flv-demuxer.h" #include #include #include #include "cpm/shared_ptr.h" #include #include #include struct rtmp_player_t { // TODO: add packet queue aio_rtmp_session_t* rtmp; struct flv_muxer_t* muxer; rtmp_player_t(aio_rtmp_session_t* rtmp) : rtmp(rtmp) { muxer = flv_muxer_create(&handler, this); } ~rtmp_player_t() { if(muxer) flv_muxer_destroy(muxer); } private: static int handler(void* param, int type, const void* data, size_t bytes, uint32_t timestamp) { rtmp_player_t* player = (rtmp_player_t*)param; switch (type) { case FLV_TYPE_SCRIPT: return aio_rtmp_server_send_script(player->rtmp, data, bytes, timestamp); case FLV_TYPE_AUDIO: return aio_rtmp_server_send_audio(player->rtmp, data, bytes, timestamp); case FLV_TYPE_VIDEO: return aio_rtmp_server_send_video(player->rtmp, data, bytes, timestamp); default: assert(0); return -1; } } }; struct rtmp_source_t { ThreadLocker locker; struct flv_demuxer_t* demuxer; std::list > players; rtmp_source_t() { demuxer = flv_demuxer_create(handler, this); } ~rtmp_source_t() { if (demuxer) flv_demuxer_destroy(demuxer); } private: static int handler(void* param, int codec, const void* data, size_t bytes, uint32_t pts, uint32_t dts, int flags) { int r = 0; rtmp_source_t* s = (rtmp_source_t*)param; AutoThreadLocker locker(s->locker); for (auto it = s->players.begin(); it != s->players.end(); ++it) { // TODO: push to packet queue switch (codec) { case FLV_VIDEO_H264: r = flv_muxer_avc((*it)->muxer, data, bytes, pts, dts); break; case FLV_VIDEO_H265: r = flv_muxer_hevc((*it)->muxer, data, bytes, pts, dts); break; case FLV_AUDIO_AAC: r = flv_muxer_aac((*it)->muxer, data, bytes, pts, dts); break; case FLV_AUDIO_MP3: r = flv_muxer_mp3((*it)->muxer, data, bytes, pts, dts); break; case FLV_VIDEO_AVCC: case FLV_VIDEO_HVCC: case FLV_AUDIO_ASC: break; // ignore default: assert(0); } } return 0; // ignore error } }; static ThreadLocker s_locker; static std::map > s_lives; static aio_rtmp_userptr_t aio_rtmp_server_onpublish(void* param, aio_rtmp_session_t* /*session*/, const char* app, const char* stream, const char* type) { printf("aio_rtmp_server_onpublish(%s, %s, %s)\n", app, stream, type); std::string key(app); key += "/"; key += stream; std::shared_ptr source(new rtmp_source_t); AutoThreadLocker locker(s_locker); assert(s_lives.find(key) == s_lives.end()); s_lives[key] = source; return source.get(); } static int aio_rtmp_server_onscript(aio_rtmp_userptr_t ptr, const void* script, size_t bytes, uint32_t timestamp) { struct rtmp_source_t* s = (struct rtmp_source_t*)ptr; AutoThreadLocker locker(s->locker); return flv_demuxer_input(s->demuxer, FLV_TYPE_SCRIPT, script, bytes, timestamp); } static int aio_rtmp_server_onvideo(aio_rtmp_userptr_t ptr, const void* data, size_t bytes, uint32_t timestamp) { struct rtmp_source_t* s = (struct rtmp_source_t*)ptr; AutoThreadLocker locker(s->locker); return flv_demuxer_input(s->demuxer, FLV_TYPE_VIDEO, data, bytes, timestamp); } static int aio_rtmp_server_onaudio(aio_rtmp_userptr_t ptr, const void* data, size_t bytes, uint32_t timestamp) { struct rtmp_source_t* s = (struct rtmp_source_t*)ptr; AutoThreadLocker locker(s->locker); return flv_demuxer_input(s->demuxer, FLV_TYPE_AUDIO, data, bytes, timestamp); } static void aio_rtmp_server_onsend(aio_rtmp_userptr_t /*ptr*/, size_t /*bytes*/) { } static void aio_rtmp_server_onclose(aio_rtmp_userptr_t ptr) { AutoThreadLocker locker(s_locker); for (auto it = s_lives.begin(); it != s_lives.end(); ++it) { std::shared_ptr& s = it->second; if (ptr == s.get()) { s_lives.erase(it); return; } AutoThreadLocker l(s->locker); for (auto j = s->players.begin(); j != s->players.end(); ++j) { if (j->get() == ptr) { s->players.erase(j); return; } } } } static aio_rtmp_userptr_t aio_rtmp_server_onplay(void* /*param*/, aio_rtmp_session_t* session, const char* app, const char* stream, double start, double duration, uint8_t reset) { printf("aio_rtmp_server_onplay(%s, %s, %f, %f, %d)\n", app, stream, start, duration, (int)reset); std::string key(app); key += "/"; key += stream; std::shared_ptr s; { AutoThreadLocker locker(s_locker); auto it = s_lives.find(key); if (it == s_lives.end()) { printf("source(%s, %s) not found\n", app, stream); return NULL; } s = it->second; for (auto j = s->players.begin(); j != s->players.end(); ++j) { if (j->get()->rtmp == session) { printf("rtmp session(%s, %s) exist\n", app, stream); return j->get(); } } } std::shared_ptr player(new rtmp_player_t(session)); AutoThreadLocker locker(s->locker); s->players.push_back(player); return player.get(); } static int aio_rtmp_server_onpause(aio_rtmp_userptr_t /*ptr*/, int pause, uint32_t ms) { printf("aio_rtmp_server_onpause(%d, %u)\n", pause, (unsigned int)ms); return 0; } static int aio_rtmp_server_onseek(aio_rtmp_userptr_t /*ptr*/, uint32_t ms) { printf("aio_rtmp_server_onseek(%u)\n", (unsigned int)ms); return 0; } void rtmp_server_forward_aio_test(const char* ip, int port) { aio_rtmp_server_t* rtmp; struct aio_rtmp_server_handler_t handler; memset(&handler, 0, sizeof(handler)); handler.onsend = aio_rtmp_server_onsend; handler.onplay = aio_rtmp_server_onplay; handler.onpause = aio_rtmp_server_onpause; handler.onseek = aio_rtmp_server_onseek; handler.onpublish = aio_rtmp_server_onpublish; handler.onscript = aio_rtmp_server_onscript; handler.onaudio = aio_rtmp_server_onaudio; handler.onvideo = aio_rtmp_server_onvideo; handler.onclose = aio_rtmp_server_onclose; aio_worker_init(8); rtmp = aio_rtmp_server_create(ip, port, &handler, NULL); while ('q' != getchar()) { } aio_rtmp_server_destroy(rtmp); aio_worker_clean(8); }