# -*-coding:utf-8 -*- from json import loads from traceback import format_exc from fastapi import APIRouter from loguru import logger from bean.Result import JsonResult from common.Constant import CHANNEL_LIST_URL, UPDATE_CHANNEL_URL from enums.HttpExceptionEnum import EasyExceptionType from util.RequestUtil import HttpRequests router = APIRouter() @router.post("/rtmp/start", tags=['EasyRtmpLive启动接口'], response_model=JsonResult) async def startPushStream(): logger.info("EasyRtmpLive启动接口!!") request = HttpRequests() headers = {'content-type': "application/json"} try: c_resp = request.send_request('GET', CHANNEL_LIST_URL, headers=headers, timeout=30) if c_resp.status_code != 200: logger.error("获取任务列表失败! 状态码: {}, {}", c_resp.status_code, c_resp.__dict__) return JsonResult.error(code=EasyExceptionType.EASY_TASK_GET_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_GET_FAIL.value[1]) else: content = c_resp.content.decode('utf-8') content = loads(content) channelList = content.get("ChannelList") if channelList is None or len(channelList) == 0: logger.error("获取EasyRtmpLive任务失败!") return JsonResult.error(code=EasyExceptionType.EASY_TASK_GET_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_GET_FAIL.value[1]) if channelList is not None and len(channelList) > 1: logger.error("EasyRtmpLive任务限制, 限制1个任务!") return JsonResult.error(code=EasyExceptionType.EASY_TASK_LIMIT.value[0], msg=EasyExceptionType.EASY_TASK_LIMIT.value[1]) channel = channelList[0] if channel["enable"] == "true": logger.error("任务正在执行, 请稍后再试!") return JsonResult.error(code=EasyExceptionType.TASK_IS_EXECUTING.value[0], msg=EasyExceptionType.TASK_IS_EXECUTING.value[1]) channel["enable"] = "true" channel["srcURL"] = '"%s"' % channel["srcURL"] req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \ (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"], channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"]) url = UPDATE_CHANNEL_URL % req_param u_resp = request.send_request('GET', url, headers=headers, timeout=30) if u_resp.status_code != 200: channel["enable"] = "false" fail_req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \ (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"], channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"]) fail_url = UPDATE_CHANNEL_URL % fail_req_param request.send_request('GET', fail_url, headers=headers, timeout=30) logger.error("EasyRtmpLive启动失败! 状态码: {}, {}", u_resp.status_code, u_resp.__dict__) return JsonResult.error(code=EasyExceptionType.EASY_TASK_START_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_START_FAIL.value[1]) else: u_content = u_resp.content.decode('utf-8') if u_content == "OK": return JsonResult.success() else: return JsonResult.error(code=EasyExceptionType.EASY_TASK_START_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_START_FAIL.value[1]) except Exception: logger.error("EasyRtmpLive启动失败异常: {}", format_exc()) return JsonResult.error(code=EasyExceptionType.EASY_TASK_START_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_START_FAIL.value[1]) finally: request.close_session() @router.put("/rtmp/stop", tags=['EasyRtmpLive停止接口'], response_model=JsonResult) async def stopPushStream(): logger.info("EasyRtmpLive停止接口!!") request = HttpRequests() headers = {'content-type': "application/json"} try: c_resp = request.send_request('GET', CHANNEL_LIST_URL, headers=headers, timeout=30) if c_resp.status_code != 200: logger.error("获取任务列表失败! 状态码: {}, {}", c_resp.status_code, c_resp.__dict__) return JsonResult.error(code=EasyExceptionType.EASY_TASK_GET_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_GET_FAIL.value[1]) else: content = c_resp.content.decode('utf-8') content = loads(content) channelList = content.get("ChannelList") if channelList is None or len(channelList) == 0: logger.error("获取EasyRtmpLive任务失败!") return JsonResult.error(code=EasyExceptionType.EASY_TASK_GET_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_GET_FAIL.value[1]) if channelList is not None and len(channelList) > 1: logger.error("EasyRtmpLive任务限制, 限制1个任务") return JsonResult.error(code=EasyExceptionType.EASY_TASK_LIMIT.value[0], msg=EasyExceptionType.EASY_TASK_LIMIT.value[1]) channel = channelList[0] if channel["enable"] == "false": logger.info("EasyRtmpLive任务已停止") return JsonResult.success() channel["enable"] = "false" channel["srcURL"] = '"%s"' % channel["srcURL"] req_param = 'indexCode=%s&name=%s&srcURL=%s&connectType=%s&timeout=5&mediaType=%s&dstURL=%s&dstFormat=%s&enable=%s' % \ (channel["indexcode"], channel["name"], channel["srcURL"], channel["connectType"], channel["mediaType"], channel["dstURL"], channel["dstFormat"], channel["enable"]) url = UPDATE_CHANNEL_URL % req_param u_resp = request.send_request('GET', url, headers=headers, timeout=30) if u_resp.status_code != 200: logger.error("停止EasyRtmpLive失败! 状态码: {}, {}", u_resp.status_code, u_resp.__dict__) return JsonResult.error(code=EasyExceptionType.EASY_TASK_STOP_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_STOP_FAIL.value[1]) else: u_content = u_resp.content.decode('utf-8') if u_content == "OK": return JsonResult.success() else: return JsonResult.error(code=EasyExceptionType.EASY_TASK_STOP_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_STOP_FAIL.value[1]) except Exception: logger.error("EasyRtmpLive启动失败异常: {}", format_exc()) return JsonResult.error(code=EasyExceptionType.EASY_TASK_STOP_FAIL.value[0], msg=EasyExceptionType.EASY_TASK_STOP_FAIL.value[1]) finally: request.close_session()