tuoheng_algN/concurrency/PullVideoStreamProcess.py

353 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
from multiprocessing import Process, Queue
from os import getpid
from time import time, sleep
from traceback import format_exc
import psutil
from loguru import logger
from concurrency.PullMqttThread import PullMqtt
from util.LogUtils import init_log
from concurrency.FileUploadThread import ImageFileUpload
from entity.FeedBack import message_feedback
from enums.AnalysisStatusEnum import AnalysisStatus
from enums.ExceptionEnum import ExceptionType
from exception.CustomerException import ServiceException
from util.Cv2Utils import check_video_stream, build_video_info, pull_read_video_stream, clear_pull_p
from util.QueUtil import get_no_block_queue, put_queue, clear_queue, put_queue_result
class PullVideoStreamProcess(Process):
__slots__ = ("_command_queue", "_msg", "_context", "_fb_queue", "_pull_queue", "_image_queue", "_analyse_type",
"_frame_num","_mqtt_list")
def __init__(self, *args):
super().__init__()
# 自带参数
self._command_queue = Queue()
# 传参
self._msg, self._context, self._fb_queue, self._pull_queue, self._image_queue, self._analyse_type, \
self._frame_num = args
self._mqtt_list = []
def sendCommand(self, result):
put_queue(self._command_queue, result, timeout=2, is_ex=True)
@staticmethod
def start_File_upload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list):
image_thread = ImageFileUpload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list)
image_thread.setDaemon(True)
image_thread.start()
return image_thread
@staticmethod
def start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context):
mqtt_thread = PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
mqtt_thread.setDaemon(True)
mqtt_thread.start()
return mqtt_thread
@staticmethod
def check(start_time, service_timeout, request_id, image_thread):
if time() - start_time > service_timeout:
logger.error("拉流进程运行超时, requestId: {}", request_id)
raise ServiceException(ExceptionType.TASK_EXCUTE_TIMEOUT.value[0],
ExceptionType.TASK_EXCUTE_TIMEOUT.value[1])
# 检测图片上传线程是否正常运行
if image_thread and not image_thread.is_alive():
logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常, requestId:{}", request_id)
raise Exception("未检测到图片上传线程活动,图片上传线程可能出现异常!")
class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
__slots__ = ()
def run(self):
# 避免循环调用性能影响, 优先赋值
context, msg, analyse_type, frame_num = self._context, self._msg, self._analyse_type, self._frame_num
base_dir, env, service = context['base_dir'], context['env'], context["service"]
request_id, pull_url = msg["request_id"], msg["pull_url"]
pull_stream_timeout, read_stream_timeout, service_timeout = int(service["cv2_pull_stream_timeout"]), \
int(service["cv2_read_stream_timeout"]), int(service["timeout"]) + 120
command_queue, pull_queue, image_queue, fb_queue ,mqtt_list= self._command_queue, self._pull_queue, self._image_queue, \
self._fb_queue,self._mqtt_list
image_thread, ex = None, None
width, height, width_height_3, all_frames, w_2, h_2, pull_p = None, None, None, 0, None, None, None
frame_list, frame_index_list = [], []
ex_status = True
try:
# 初始化日志
init_log(base_dir, env)
logger.info("开启启动实时视频拉流进程, requestId:{},pid:{},ppid:{}", request_id,os.getpid(),os.getppid())
#开启mqtt
if service["mqtt_flag"]==1:
mqtt_thread = self.start_PullMqtt(fb_queue, mqtt_list, request_id, analyse_type, context)
# 开启图片上传线程
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type,mqtt_list)
cv2_init_num, init_pull_num, concurrent_frame = 0, 1, 1
start_time, pull_stream_start_time, read_start_time, full_timeout = time(), None, None, None
while True:
# 检测任务执行是否超时、图片上传线程是否正常
self.check(start_time, service_timeout, request_id, image_thread)
command_msg = get_no_block_queue(command_queue)
if command_msg is not None:
if 'stop' == command_msg.get("command"):
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
break
if 'stop_ex' == command_msg.get("command"):
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
ex_status = False
break
if command_msg.get("command") in ['algStart' , 'algStop' ]:
logger.info("拉流进程中requestId:{},向图片上传进程发送命令:{}", request_id,command_msg.get("command"))
put_queue(image_queue, (2, command_msg.get("command") ), timeout=1)
# 检测视频信息是否存在或拉流对象是否存在
if check_video_stream(width, height):
if len(frame_list) > 0:
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
frame_list, frame_index_list = [], []
logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
if pull_stream_start_time is None:
pull_stream_start_time = time()
pull_stream_init_timeout = time() - pull_stream_start_time
if pull_stream_init_timeout > pull_stream_timeout:
logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, request_id)
# 如果超时了, 将异常信息发送给主进程,如果队列满了,抛出异常
raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0],
ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1])
cv2_init_num += 1
width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
if width is None:
sleep(1)
continue
pull_stream_start_time, cv2_init_num = None, 1
frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height, width_height_3,
w_2, h_2, request_id)
if pull_queue.full():
logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
if full_timeout is None:
full_timeout = time()
if time() - full_timeout > 180:
logger.error("拉流队列阻塞超时, 请检查父进程是否正常requestId: {}", request_id)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
if psutil.Process(getpid()).ppid() == 1:
clear_pull_p(pull_p, request_id)
ex_status = False
for q in [command_queue, pull_queue, image_queue]:
clear_queue(q)
if image_thread and image_thread.is_alive():
put_queue(image_queue, (2, "stop"), timeout=1)
image_thread.join(120)
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
put_queue(self._fb_queue, message_feedback(request_id, AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1]), timeout=2)
break
del frame
continue
full_timeout = None
if frame is None:
clear_pull_p(pull_p, request_id)
if len(frame_list) > 0:
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
frame_list, frame_index_list = [], []
logger.info("获取帧为空, 开始重试: {}次, requestId: {}", init_pull_num, request_id)
if read_start_time is None:
read_start_time = time()
pull_stream_read_timeout = time() - read_start_time
if pull_stream_read_timeout > read_stream_timeout:
logger.info("拉流过程中断了重试超时, 超时时间: {}, requestId: {}", pull_stream_read_timeout,
request_id)
raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
init_pull_num += 1
sleep(1)
continue
init_pull_num, read_start_time = 1, None
frame_list.append(frame)
frame_index_list.append(concurrent_frame)
if len(frame_list) >= frame_num:
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
frame_list, frame_index_list = [], []
concurrent_frame += 1
del frame
except ServiceException as s:
logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
ex = s.code, s.msg
except Exception:
logger.error("实时拉流异常: {}, 队列大小:{}, requestId:{}", format_exc(), pull_queue.qsize(), request_id)
ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
finally:
clear_pull_p(pull_p, request_id)
del frame_list, frame_index_list
if ex_status:
if ex:
code, msg = ex
r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
else:
r = put_queue_result(pull_queue, (2,), timeout=10)
if r:
c_time = time()
while time() - c_time < 60:
command_msg = get_no_block_queue(command_queue)
if command_msg is not None:
if 'stop' == command_msg.get("command"):
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
if image_thread and image_thread.is_alive():
put_queue(image_queue, (2, "stop"), timeout=1)
logger.info("停止图片上传线程, requestId:{}", request_id)
image_thread.join(120)
logger.info("停止图片上传线程结束, requestId:{}", request_id)
break
for q in [command_queue, pull_queue, image_queue]:
clear_queue(q)
if image_thread and image_thread.is_alive():
put_queue(image_queue, (2, "stop"), timeout=1)
logger.info("停止图片上传线程, requestId:{}", request_id)
image_thread.join(120)
logger.info("停止图片上传线程结束, requestId:{}", request_id)
logger.info("实时拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)
class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
__slots__ = ()
def run(self):
msg, context, frame_num, analyse_type = self._msg, self._context, self._frame_num, self._analyse_type
request_id, base_dir, env, pull_url = msg["request_id"], context['base_dir'], context['env'], msg["pull_url"]
ex, service_timeout, full_timeout = None, int(context["service"]["timeout"]) + 120, None
command_queue, pull_queue, image_queue, fb_queue = self._command_queue, self._pull_queue, self._image_queue, \
self._fb_queue
image_thread, pull_p = None, None
width, height, width_height_3, all_frames, w_2, h_2 = None, None, None, 0, None, None
frame_list, frame_index_list = [], []
ex_status = True
try:
# 初始化日志
init_log(base_dir, env)
logger.info("开启离线视频拉流进程, requestId:{}", request_id)
# 开启图片上传线程
image_thread = self.start_File_upload(fb_queue, context, msg, image_queue, analyse_type,[])
# 初始化拉流工具类
cv2_init_num, concurrent_frame = 0, 1
start_time = time()
while True:
# 检测任务执行是否超时、图片上传线程是否正常
self.check(start_time, service_timeout, request_id, image_thread)
command_msg = get_no_block_queue(command_queue)
if command_msg is not None:
if 'stop' == command_msg.get("command"):
logger.info("开始停止离线拉流进程, requestId:{}", request_id)
break
if 'stop_ex' == command_msg.get("command"):
logger.info("开始停止离线拉流进程, requestId:{}", request_id)
ex_status = False
break
if command_msg.get("command") in ['algStart' , 'algStop' ]:
logger.info("拉流进程中requestId:{},向图片上传进程发送命令:{}", request_id,command_msg.get("command"))
put_queue(image_queue, (2, command_msg.get("command") ), timeout=1)
# 检测视频信息是否存在或拉流对象是否存在
if check_video_stream(width, height):
logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, request_id)
if cv2_init_num > 3:
logger.info("离线拉流重试失败, 重试次数: {}, requestId: {}", cv2_init_num, request_id)
raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
cv2_init_num += 1
sleep(1)
width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, request_id)
continue
if pull_queue.full():
logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), request_id)
if full_timeout is None:
full_timeout = time()
if time() - full_timeout > 180:
logger.error("pull队列阻塞超时请检测父进程是否正常requestId: {}", request_id)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
if psutil.Process(getpid()).ppid() == 1:
clear_pull_p(pull_p, request_id)
ex_status = False
for q in [command_queue, pull_queue, image_queue]:
clear_queue(q)
put_queue(image_queue, (2, "stop"), timeout=1)
image_thread.join(120)
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", request_id)
put_queue(self._fb_queue, message_feedback(request_id,
AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1]), timeout=2)
break
continue
full_timeout = None
frame, pull_p, width, height = pull_read_video_stream(pull_p, pull_url, width, height,
width_height_3, w_2, h_2, request_id)
if frame is None:
logger.info("总帧数: {}, 当前帧数: {}, requestId: {}", all_frames, concurrent_frame, request_id)
clear_pull_p(pull_p, request_id)
if len(frame_list) > 0:
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1)
# 允许100帧的误差
if concurrent_frame < all_frames - 100:
logger.info("离线拉流异常结束requestId: {}", request_id)
raise ServiceException(ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[0],
ExceptionType.READSTREAM_TIMEOUT_EXCEPTION.value[1])
logger.info("离线拉流线程结束, requestId: {}", request_id)
break
frame_list.append(frame)
frame_index_list.append(concurrent_frame)
if len(frame_list) >= frame_num:
put_queue(pull_queue, (4, (frame_list, frame_index_list, all_frames)), timeout=1, is_ex=True)
frame_list, frame_index_list = [], []
concurrent_frame += 1
del frame
except ServiceException as s:
logger.error("离线拉流异常: {}, 队列大小:{}, requestId:{}", s.msg, pull_queue.qsize(), request_id)
ex = s.code, s.msg
except Exception:
logger.error("离线拉流异常: {}, requestId:{}", format_exc(), request_id)
ex = ExceptionType.SERVICE_INNER_EXCEPTION.value[0], ExceptionType.SERVICE_INNER_EXCEPTION.value[1]
finally:
clear_pull_p(pull_p, request_id)
del frame_list, frame_index_list
if ex_status:
if ex:
code, msg = ex
r = put_queue_result(pull_queue, (1, code, msg), timeout=10)
else:
r = put_queue_result(pull_queue, (2,), timeout=10)
if r:
c_time = time()
while time() - c_time < 180:
command_msg = get_no_block_queue(command_queue)
if command_msg is not None:
if 'stop' == command_msg.get("command"):
logger.info("开始停止实时拉流进程, requestId:{}", request_id)
if image_thread and image_thread.is_alive():
put_queue(image_queue, (2, "stop"), timeout=1)
logger.info("停止图片上传线程, requestId:{}", request_id)
image_thread.join(120)
logger.info("停止图片上传线程结束, requestId:{}", request_id)
break
for q in [command_queue, pull_queue, image_queue]:
clear_queue(q)
if image_thread and image_thread.is_alive():
put_queue(image_queue, (2, "stop"), timeout=1)
logger.info("停止图片上传线程, requestId:{}", request_id)
image_thread.join(120)
logger.info("停止图片上传线程结束, requestId:{}", request_id)
logger.info("离线拉流线程结束, 图片队列: {}, 拉流队列: {}, 图片进程的状态: {} requestId: {}",
image_queue.qsize(), pull_queue.qsize(), image_thread.is_alive(), request_id)