350 lines
20 KiB
Python
350 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
||
import os
|
||
from multiprocessing import Process, Queue
|
||
from os import getpid
|
||
from time import time, sleep
|
||
from traceback import format_exc
|
||
|
||
import psutil
|
||
from loguru import logger
|
||
|
||
from util.LogUtils import init_log
|
||
from concurrency.FileUploadThread import ImageFileUpload
|
||
from entity.FeedBack import message_feedback
|
||
from enums.AnalysisStatusEnum import AnalysisStatus
|
||
from enums.ExceptionEnum import ExceptionType
|
||
from exception.CustomerException import ServiceException
|
||
from util.Cv2Utils import check_video_stream, build_video_info, pull_read_video_stream, clear_pull_p
|
||
from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_result
|
||
|
||
|
||
class PullVideoStreamProcess2(Process):
|
||
__slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type",
|
||
"_frame_num")
|
||
|
||
def __init__(self, *args):
|
||
super().__init__()
|
||
# 自带参数
|
||
self._command_queue = Queue()
|
||
|
||
# 传参
|
||
self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \
|
||
self._frame_num = args
|
||
|
||
def sendCommand(self, result):
|
||
try:
|
||
self._command_queue.put(result, timeout=10)
|
||
except Exception:
|
||
logger.error("添加队列超时异常:{}, requestId:{}", format_exc(), self._msg.get("request_id"))
|
||
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
|
||
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
|
||
|
||
@staticmethod
|
||
def start_File_upload(*args):
|
||
fb_queue, context, msg, image_queue, analyse_type = args
|
||
image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type)
|
||
image_thread.setDaemon(True)
|
||
image_thread.start()
|
||
return image_thread
|
||
|
||
@staticmethod
|
||
def check(start_time, service_timeout, request_id, image_thread):
|
||
if time() - start_time > service_timeout:
|
||
logger.error("分析超时, requestId: {}", request_id)
|
||
raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0],
|
||
ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1])
|
||
# 检测图片上传线程是否正常运行
|
||
if image_thread is not None and not image_thread.is_alive():
|
||
logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, requestId:{}", request_id)
|
||
raise Exception("未检测到图片上传线程活动,图片上传线程可能出现异常!")
|
||
|
||
|
||
class OnlinePullVideoStreamProcess2(PullVideoStreamProcess2):
|
||
__slots__ = ()
|
||
|
||
def run(self):
|
||
# 避免循环调用性能影响, 优先赋值
|
||
context, msg, analyse_type = self._context, self._msg, self._analyse_type
|
||
request_id, base_dir, env = msg["request_id"], context['base_dir'], context['env']
|
||
pull_url, frame_num = msg["pull_url"], self._frame_num
|
||
pull_stream_timeout = int(context["service"]["cv2_pull_stream_timeout"])
|
||
read_stream_timeout = int(context["service"]["cv2_read_stream_timeout"])
|
||
service_timeout = int(context["service"]["timeout"])
|
||
command_queue, pull_queue, image_queue = self._command_queue, self._pull_queue, self._image_queue
|
||
fb_queue = self._fb_queue
|
||
image_thread, pull_p = None, None
|
||
width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
|
||
frame_list, frame_index_list = [], []
|
||
ex = None
|
||
ex_status = True
|
||
full_timeout = None
|
||
try:
|
||
# 初始化日志
|
||
init_log(base_dir, env)
|
||
logger.info("开启实时视频拉流进程, requestId:{}", request_id)
|
||
|
||
# 开启图片上传线程
|
||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
|
||
|
||
# 初始化拉流工具类
|
||
cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
|
||
start_time, pull_start_time, read_start_time = time(), None, None
|
||
while True:
|
||
# 检测任务执行是否超时、图片上传线程是否正常
|
||
self.check(start_time, service_timeout, request_id, image_thread)
|
||
command_msg = get_no_block_queue(command_queue)
|
||
if command_msg is not None:
|
||
if 'stop' == command_msg.get("command"):
|
||
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
|
||
break
|
||
if 'stop_ex' == command_msg.get("command"):
|
||
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
|
||
ex_status = False
|
||
break
|
||
# 检测视频信息是否存在或拉流对象是否存在
|
||
if check_video_stream(width, height):
|
||
if len(frame_list) > 0:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
|
||
frame_list, frame_index_list = [], []
|
||
logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
|
||
if pull_start_time is None:
|
||
pull_start_time = time()
|
||
pull_stream_init_timeout = time() - pull_start_time
|
||
if pull_stream_init_timeout > pull_stream_timeout:
|
||
logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id)
|
||
raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
|
||
ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
|
||
cv2_init_num += 1
|
||
width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
|
||
if width is None:
|
||
sleep(1)
|
||
continue
|
||
pull_start_time, cv2_init_num = None, 1
|
||
frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
|
||
w_2, h_2, request_id)
|
||
if frame is None:
|
||
if len(frame_list) > 0:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
|
||
frame_list, frame_index_list = [], []
|
||
logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id)
|
||
if read_start_time is None:
|
||
read_start_time = time()
|
||
pull_stream_read_timeout = time() - read_start_time
|
||
if pull_stream_read_timeout > read_stream_timeout:
|
||
logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout,
|
||
request_id)
|
||
raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
|
||
ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
|
||
init_pull_num += 1
|
||
continue
|
||
init_pull_num, read_start_time = 1, None
|
||
if pull_queue.full():
|
||
logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
|
||
if full_timeout is None:
|
||
full_timeout = time()
|
||
if time() - full_timeout > 180:
|
||
logger.error("拉流队列阻塞异常, requestId: {}", request_id)
|
||
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
|
||
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
|
||
if psutil.Process(getpid()).ppid() == 1:
|
||
clear_pull_p(pull_p, request_id)
|
||
ex_status = False
|
||
for q in [command_queue, pull_queue, image_queue]:
|
||
clear_queue(q)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
image_thread.join(120)
|
||
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
|
||
put_queue(self._fb_queue, message_feedback(request_id,
|
||
AnalysisStatus.FAILED.value,
|
||
self._analyse_type,
|
||
ExceptionType.NO_RESOURCES.value[0],
|
||
ExceptionType.NO_RESOURCES.value[1]))
|
||
break
|
||
del frame
|
||
continue
|
||
full_timeout = None
|
||
frame_list.append(frame)
|
||
frame_index_list.append(concurrent_frame)
|
||
if len(frame_list) >= frame_num:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
|
||
frame_list, frame_index_list = [], []
|
||
concurrent_frame += 1
|
||
del frame
|
||
except ServiceException as s:
|
||
logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
|
||
ex = s.code, s.msg
|
||
except Exception:
|
||
logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", format_exc(), pull_queue.qsize(), request_id)
|
||
ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
|
||
finally:
|
||
clear_pull_p(pull_p, request_id)
|
||
del frame_list, frame_index_list
|
||
if ex_status:
|
||
if ex:
|
||
code, msg = ex
|
||
r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
|
||
else:
|
||
r = put_queue_result(pull_queue, (2,), timeout=10)
|
||
if r:
|
||
c_time = time()
|
||
while time() - c_time < 180:
|
||
command_msg = get_no_block_queue(command_queue)
|
||
if command_msg is not None:
|
||
if 'stop' == command_msg.get("command"):
|
||
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
logger.info("停止图片上传线程, requestId:{}", request_id)
|
||
image_thread.join(120)
|
||
logger.info("停止图片上传线程结束, requestId:{}", request_id)
|
||
break
|
||
for q in [command_queue, pull_queue, image_queue]:
|
||
clear_queue(q)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
logger.info("停止图片上传线程, requestId:{}", request_id)
|
||
image_thread.join(120)
|
||
logger.info("停止图片上传线程结束, requestId:{}", request_id)
|
||
logger.info("实时拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
|
||
image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)
|
||
|
||
|
||
class OfflinePullVideoStreamProcess2(PullVideoStreamProcess2):
|
||
__slots__ = ()
|
||
|
||
def run(self):
|
||
msg, context, frame_num, analyse_type = self._msg, self._context, self._frame_num, self._analyse_type
|
||
request_id, base_dir, env, pull_url = msg["request_id"], context['base_dir'], context['env'], msg["pull_url"]
|
||
ex, service_timeout = None, int(context["service"]["timeout"])
|
||
command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
|
||
self._fb_queue
|
||
image_thread, pull_p = None, None
|
||
width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
|
||
frame_list, frame_index_list = [], []
|
||
ex_status = True
|
||
full_timeout = None
|
||
try:
|
||
# 初始化日志
|
||
init_log(base_dir, env)
|
||
logger.info("开启离线视频拉流进程, requestId:{}", request_id)
|
||
|
||
# 开启图片上传线程
|
||
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type)
|
||
|
||
# 初始化拉流工具类
|
||
cv2_init_num = 0
|
||
concurrent_frame = 1
|
||
start_time = time()
|
||
while True:
|
||
# 检测任务执行是否超时、图片上传线程是否正常
|
||
self.check(start_time, service_timeout, request_id, image_thread)
|
||
command_msg = get_no_block_queue(command_queue)
|
||
if command_msg is not None:
|
||
if 'stop' == command_msg.get("command"):
|
||
logger.info("开始停止离线拉流进程, requestId:{}", request_id)
|
||
break
|
||
if 'stop_ex' == command_msg.get("command"):
|
||
logger.info("开始停止离线拉流进程, requestId:{}", request_id)
|
||
ex_status = False
|
||
break
|
||
# 检测视频信息是否存在或拉流对象是否存在
|
||
if check_video_stream(width, height):
|
||
if len(frame_list) > 0:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
|
||
frame_list, frame_index_list = [], []
|
||
logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
|
||
if cv2_init_num > 3:
|
||
clear_pull_p(pull_p, request_id)
|
||
logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id)
|
||
raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
|
||
ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
|
||
cv2_init_num += 1
|
||
sleep(1)
|
||
width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
|
||
continue
|
||
if pull_queue.full():
|
||
logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
|
||
if full_timeout is None:
|
||
full_timeout = time()
|
||
if time() - full_timeout > 300:
|
||
logger.error("拉流队列阻塞超时, 请检查父进程是否正常!requestId: {}", request_id)
|
||
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
|
||
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
|
||
if psutil.Process(getpid()).ppid() == 1:
|
||
clear_pull_p(pull_p, request_id)
|
||
ex_status = False
|
||
for q in [command_queue, pull_queue, image_queue]:
|
||
clear_queue(q)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
image_thread.join(120)
|
||
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
|
||
put_queue(self._fb_queue, message_feedback(request_id,
|
||
AnalysisStatus.FAILED.value,
|
||
self._analyse_type,
|
||
ExceptionType.NO_RESOURCES.value[0],
|
||
ExceptionType.NO_RESOURCES.value[1]))
|
||
break
|
||
continue
|
||
full_timeout = None
|
||
frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
|
||
w_2, h_2, request_id)
|
||
if frame is None:
|
||
logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id)
|
||
clear_pull_p(pull_p, request_id)
|
||
if len(frame_list) > 0:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=2, is_ex=False)
|
||
frame_list, frame_index_list = [], []
|
||
# 允许100帧的误差
|
||
if concurrent_frame < all_frames - 100:
|
||
logger.info("离线拉流异常结束:requestId: {}", request_id)
|
||
raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
|
||
ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
|
||
logger.info("离线拉流线程结束, requestId: {}", request_id)
|
||
break
|
||
frame_list.append(frame)
|
||
frame_index_list.append(concurrent_frame)
|
||
if len(frame_list) >= frame_num:
|
||
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
|
||
frame_list, frame_index_list = [], []
|
||
concurrent_frame += 1
|
||
del frame
|
||
except ServiceException as s:
|
||
logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
|
||
ex = s.code, s.msg
|
||
except Exception:
|
||
logger.error("实时拉流异常: {}, requestId:{}", format_exc(), request_id)
|
||
ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
|
||
finally:
|
||
clear_pull_p(pull_p, request_id)
|
||
del frame_list, frame_index_list
|
||
if ex_status:
|
||
if ex:
|
||
code, msg = ex
|
||
r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
|
||
else:
|
||
r = put_queue_result(pull_queue, (2,), timeout=10)
|
||
if r:
|
||
c_time = time()
|
||
while time() - c_time < 180:
|
||
command_msg = get_no_block_queue(command_queue)
|
||
if command_msg is not None:
|
||
if 'stop' == command_msg.get("command"):
|
||
logger.info("开始停止离线拉流进程, requestId:{}", request_id)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
logger.info("停止图片上传线程, requestId:{}", request_id)
|
||
image_thread.join(120)
|
||
logger.info("停止图片上传线程结束, requestId:{}", request_id)
|
||
break
|
||
for q in [command_queue, pull_queue, image_queue]:
|
||
clear_queue(q)
|
||
if image_thread and image_thread.is_alive():
|
||
put_queue(image_queue, (2, "stop"), timeout=1)
|
||
logger.info("停止图片上传线程, requestId:{}", request_id)
|
||
image_thread.join(120)
|
||
logger.info("停止图片上传线程结束, requestId:{}", request_id)
|
||
logger.info("离线拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
|
||
image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)
|