|
1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- # -*-coding:utf-8 -*-
- from multiprocessing import Queue
-
- from fastapi import APIRouter, Depends
- from loguru import logger
-
- from bean.Result import JsonResult
- from bean.Stream import PushStreamRequest, CallbackRequest
- from common.Constant import get_share_queue, get_task_record
- from enums.HttpExceptionEnum import StreamExceptionType
- from exception.CustomerException import ServiceException
- from util.QueUtil import put_queue
-
- router = APIRouter()
-
-
- @router.post("/start", tags=['推流启动接口'], response_model=JsonResult)
- async def startPushStream(req: PushStreamRequest, msg_queue: Queue = Depends(get_share_queue),
- task_record: dict = Depends(get_task_record)):
- logger.info("接收到推流启动接口调用, 请求体: {}", req.json())
- if task_record["stream"] is not None:
- raise ServiceException(code=StreamExceptionType.TASK_IS_EXECUTING.value[0],
- msg=StreamExceptionType.TASK_IS_EXECUTING.value[1])
- put_queue(msg_queue, ("stream", {"command": "start", "pull_url": req.pullUrl, "push_url": req.pushUrl,
- "callback_url": req.callbackUrl}), timeout=2,
- is_throw_ex=True)
- return JsonResult.success()
-
-
- @router.put("/stop", tags=['推流停止接口'], response_model=JsonResult)
- async def stopPushStream(msg_queue: Queue = Depends(get_share_queue),
- task_record: dict = Depends(get_task_record)):
- logger.info("接收到推流停止接口调用")
- if task_record["stream"] is None:
- raise ServiceException(code=StreamExceptionType.TASK_NOT_EXISTS.value[0],
- msg=StreamExceptionType.TASK_NOT_EXISTS.value[1])
- put_queue(msg_queue, ("stream", {"command": "stop"}), timeout=2, is_throw_ex=True)
- return JsonResult.success()
-
-
- @router.post("/callback", tags=['自验证回调接口'], response_model=JsonResult)
- async def stopPushStream(req: CallbackRequest):
- print("接受回调请求, 请求体: ", req.json())
- return JsonResult.success()
|