Browse Source

更新

tags/V2.7.7^2
chenyukun 1 year ago
parent
commit
f32c3b314e
6 changed files with 310 additions and 234 deletions
  1. +9
    -42
      .idea/workspace.xml
  2. +47
    -24
      concurrency/IntelligentRecognitionProcess.py
  3. +38
    -6
      concurrency/PullVideoStreamProcess.py
  4. +20
    -2
      service/Dispatcher.py
  5. +195
    -159
      test/内存优化/slots/test1.py
  6. +1
    -1
      util/CpuUtils.py

+ 9
- 42
.idea/workspace.xml View File

@@ -5,48 +5,12 @@
</component>
<component name="ChangeListManager">
<list default="true" id="4f7dccd9-8f92-4a6e-90cc-33890d102263" name="Changes" comment="Changes">
<change afterPath="$PROJECT_DIR$/util/RWUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/deployment.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/deployment.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/sshConfigs.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/sshConfigs.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/webServers.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/webServers.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/common/Constant.py" beforeDir="false" afterPath="$PROJECT_DIR$/common/Constant.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/common/YmlConstant.py" beforeDir="false" afterPath="$PROJECT_DIR$/common/YmlConstant.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/CommonThread.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/CommonThread.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/FeedbackThread.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/FeedbackThread.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/FileUploadThread.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/FileUploadThread.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/HeartbeatThread.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/HeartbeatThread.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/PullVideoStreamProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/PullVideoStreamProcess.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/config/__init__.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/demo1.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/demo1.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dsp_application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/config/dsp_application.json" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dsp_master.py" beforeDir="false" afterPath="$PROJECT_DIR$/dsp_master.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/enums/ExceptionEnum.py" beforeDir="false" afterPath="$PROJECT_DIR$/enums/ExceptionEnum.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/enums/ModelTypeEnum.py" beforeDir="false" afterPath="$PROJECT_DIR$/enums/ModelTypeEnum.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/service/Dispatcher.py" beforeDir="false" afterPath="$PROJECT_DIR$/service/Dispatcher.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/__init__.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/__init__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/cpu/test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/cpu/test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/demo/demo1.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/demo/demo1.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/str/test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/str/test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/while/test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/while/test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/读写/csv_test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/读写/csv_test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/读写/demo.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/读写/demo.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/路径/Test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/路径/Test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/集合/test.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/集合/test.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/AliyunSdk.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/AliyunSdk.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/Cv2Utils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/Cv2Utils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/FileUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/FileUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/GPUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/GPUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/ImageUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/ImageUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/ImgBaiduSdk.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/ImgBaiduSdk.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/KafkaUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/KafkaUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/LogUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/LogUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/ModelUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/ModelUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/OcrBaiduSdk.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/OcrBaiduSdk.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/TimeUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/TimeUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/YmlUtils.py" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/vodsdk/AliyunVodUploader.py" beforeDir="false" afterPath="$PROJECT_DIR$/vodsdk/AliyunVodUploader.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/内存优化/slots/test1.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/内存优化/slots/test1.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/CpuUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/CpuUtils.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -175,7 +139,7 @@
"WebServerToolWindowPanel.toolwindow.show.date": "false",
"WebServerToolWindowPanel.toolwindow.show.permissions": "false",
"WebServerToolWindowPanel.toolwindow.show.size": "false",
"last_opened_file_path": "D:/tuoheng/codenew/tuoheng_alg",
"last_opened_file_path": "D:/tuoheng/code1/alg_aaa/streamTool",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
@@ -562,7 +526,10 @@
<workItem from="1687219517554" duration="39842000" />
<workItem from="1687306657563" duration="921000" />
<workItem from="1687307950930" duration="44000" />
<workItem from="1687308509659" duration="5455000" />
<workItem from="1687308509659" duration="25425000" />
<workItem from="1687652018398" duration="8524000" />
<workItem from="1687736740408" duration="603000" />
<workItem from="1687737713032" duration="3021000" />
</task>
<servers />
</component>
@@ -600,7 +567,7 @@
</line-breakpoint>
<line-breakpoint enabled="true" suspend="THREAD" type="python-line">
<url>file://$PROJECT_DIR$/service/Dispatcher.py</url>
<line>332</line>
<line>350</line>
<option name="timeStamp" value="9" />
</line-breakpoint>
</breakpoints>
@@ -623,7 +590,7 @@
<SUITE FILE_PATH="coverage/tuoheng_alg$CpuUtils.coverage" NAME="CpuUtils 覆盖结果" MODIFIED="1686972304076" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg$ffmpeg12.coverage" NAME="ffmpeg12 覆盖结果" MODIFIED="1675391366890" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/ffmpeg11" />
<SUITE FILE_PATH="coverage/tuoheng_alg$Test__2_.coverage" NAME="Test (2) 覆盖结果" MODIFIED="1681796501563" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/路径" />
<SUITE FILE_PATH="coverage/tuoheng_alg$test1.coverage" NAME="test1 覆盖结果" MODIFIED="1687183593817" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/内存优化/slots" />
<SUITE FILE_PATH="coverage/tuoheng_alg$test1.coverage" NAME="test1 覆盖结果" MODIFIED="1687661266628" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/内存优化/slots" />
<SUITE FILE_PATH="coverage/tuoheng_alg$ossdemo.coverage" NAME="ossdemo 覆盖结果" MODIFIED="1681715255761" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/aliyun" />
<SUITE FILE_PATH="coverage/tuoheng_alg$Counter.coverage" NAME="Counter 覆盖结果" MODIFIED="1684894898737" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/collections" />
<SUITE FILE_PATH="coverage/tuoheng_alg$test__1_.coverage" NAME="test (1) 覆盖结果" MODIFIED="1687056062763" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/内存优化/slots" />

+ 47
- 24
concurrency/IntelligentRecognitionProcess.py View File

@@ -30,8 +30,7 @@ from enums.RecordingStatusEnum import RecordingStatus
from util import TimeUtils
from util.AliyunSdk import AliyunOssSdk, ThAliyunVodSdk
from util.CpuUtils import check_cpu
from util.Cv2Utils import Cv2Util, video_conjuncing, push_video_stream, write_or_video, write_ai_video, close_all_p, \
clear_push_p
from util.Cv2Utils import Cv2Util, video_conjuncing, push_video_stream, write_or_video, write_ai_video, close_all_p
from entity.FeedBack import message_feedback, recording_feedback
from exception.CustomerException import ServiceException
from util.ImageUtils import PictureWaterMark, url2Array, add_water_pic
@@ -46,33 +45,35 @@ from util.PlotsUtils import draw_painting_joint, get_label_arrays
class IntelligentRecognitionProcess(Process):
__slots__ = (
'_fbQueue',
'_eventQueue',
'eventQueue',
'_imageQueue',
'_hbQueue',
'_pullQueue',
'pullQueue',
'_context',
'_msg',
'_analyse_type',
'_base_dir',
'_gpu_name',
'_enable_add_water',
'_logo'
'_logo',
'start_proccess_time'
)

def __init__(self, param):
super().__init__()
# Param(self._fbQueue, msg, analysisType, self.__base_dir)
self._fbQueue = param.fbqueue
self._eventQueue = Queue()
self.eventQueue = Queue()
self._imageQueue = Queue()
self._hbQueue = Queue()
self._pullQueue = Queue(100)
self.pullQueue = Queue(100)
self._context = param.context
self._base_dir = param.base_dir
self._msg = param.msg
self._gpu_name = param.gpu_name
self._analyse_type = param.analyse_type
self._enable_add_water = bool(self._context["video"]["video_add_water"])
self.start_proccess_time = time.time()
if self._enable_add_water:
self._logo = self._msg.get("logo_url")
if self._logo:
@@ -84,8 +85,8 @@ class IntelligentRecognitionProcess(Process):

def clearPullQueue(self):
while True:
if self._pullQueue.qsize() > 0 or self._imageQueue.qsize() > 0:
getNoBlockQueue(self._pullQueue)
if self.pullQueue.qsize() > 0 or self._imageQueue.qsize() > 0:
getNoBlockQueue(self.pullQueue)
getNoBlockQueue(self._imageQueue)
else:
break
@@ -103,7 +104,7 @@ class IntelligentRecognitionProcess(Process):
# 给本进程发送事件
def sendEvent(self, eBody):
try:
self._eventQueue.put(eBody, timeout=10)
self.eventQueue.put(eBody, timeout=10)
except Exception:
logger.error("添加事件到队列超时异常:{}, requestId:{}", format_exc(), self._msg.get("request_id"))
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
@@ -427,7 +428,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
self._msg.get("request_id"))

def start_pull_stream(self):
pullProcess = OnlinePullVideoStreamProcess(self._msg, self._context, self._pullQueue,
pullProcess = OnlinePullVideoStreamProcess(self._msg, self._context, self.pullQueue,
self._fbQueue, self._hbQueue, self._imageQueue,
self._analyse_type, self._base_dir)
pullProcess.daemon = True
@@ -450,8 +451,8 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
init_log(self._base_dir)
logger.info("实时任务进行中!!!!!!!requestId: {}", requestId)
# 获取变量
eventQueue = self._eventQueue
pull_queue = self._pullQueue
eventQueue = self.eventQueue
pull_queue = self.pullQueue
image_queue = self._imageQueue
# 加载模型
model_array = self.get_model()
@@ -476,6 +477,18 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
if event_result:
cmdStr = event_result.get("command")
# 接收到停止指令
if "stop_ex" == cmdStr:
close_all_p(push_p, or_video_file, ai_video_file, requestId)
logger.info("实时任务开始停止, requestId: {}", requestId)
pullProcess.sendCommand({"command": 'stop_image_hb'})
feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1],
analyse_time=TimeUtils.now_date_to_str())}
self.waitPullStream(pullProcess)
break
# 接收到停止指令
if "stop" == cmdStr:
logger.info("实时任务开始停止, requestId: {}", requestId)
pullProcess.sendCommand({"command": 'stop_pull_stream'})
@@ -606,7 +619,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
self._msg.get("request_id"))

def start_pull_stream(self):
pullProcess = OfflinePullVideoStreamProcess(self._msg, self._context, self._pullQueue, self._fbQueue,
pullProcess = OfflinePullVideoStreamProcess(self._msg, self._context, self.pullQueue, self._fbQueue,
self._hbQueue, self._imageQueue, self._analyse_type, self._base_dir)
pullProcess.daemon = True
pullProcess.start()
@@ -616,8 +629,8 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
# cv2tool = None
requestId = self._msg.get("request_id")
push_url = self._msg.get("push_url")
eventQueue = self._eventQueue
pull_queue = self._pullQueue
eventQueue = self.eventQueue
pull_queue = self.pullQueue
image_queue = self._imageQueue
hb_queue = self._hbQueue
fb_queue = self._fbQueue
@@ -650,6 +663,16 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
event_result = getNoBlockQueue(eventQueue)
if event_result:
cmdStr = event_result.get("command")
if "stop_ex" == cmdStr:
logger.info("离线任务开始停止, requestId: {}", requestId)
pullProcess.sendCommand({"command": 'stop_image_hb'})
feedback = {"feedback": message_feedback(requestId, AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1],
analyse_time=TimeUtils.now_date_to_str())}
self.waitPullStream(pullProcess)
break
if "stop" == cmdStr:
logger.info("离线任务开始停止分析, requestId: {}", requestId)
pullProcess.sendCommand({"command": 'stop_pull_stream'})
@@ -1309,9 +1332,9 @@ class ScreenRecordingProcess(Process):
# Param(self.__fbQueue, msg, analysisType, self.__base_dir, self.__context, self.__gpu_name)
######################################### 初始化变量 #########################################
self._fbQueue = param.fbqueue
self._eventQueue = Queue()
self.eventQueue = Queue()
self._hbQueue = Queue()
self._pullQueue = Queue(200)
self.pullQueue = Queue(200)
self._context = param.context
self._msg = param.msg
self._analysisType = param.analyse_type
@@ -1326,19 +1349,19 @@ class ScreenRecordingProcess(Process):

def clearPullQueue(self):
while True:
if self._pullQueue.qsize() > 0:
if self.pullQueue.qsize() > 0:
self.getPullQueue()
else:
break

def sendEvent(self, eBody):
self._eventQueue.put(eBody)
self.eventQueue.put(eBody)

# 获取下一个事件
def getEvent(self):
eBody = None
try:
eBody = self._eventQueue.get(block=False)
eBody = self.eventQueue.get(block=False)
return eBody
except Exception as e:
pass
@@ -1347,7 +1370,7 @@ class ScreenRecordingProcess(Process):
def getPullQueue(self):
eBody = None
try:
eBody = self._pullQueue.get(block=False)
eBody = self.pullQueue.get(block=False)
return eBody
except Exception as e:
pass
@@ -1365,7 +1388,7 @@ class ScreenRecordingProcess(Process):
recording_video_url)})

def start_pull_stream_thread(self):
pullThread = RecordingPullStreamThread(self._msg, self._context, self._pullQueue, self._fbQueue)
pullThread = RecordingPullStreamThread(self._msg, self._context, self.pullQueue, self._fbQueue)
pullThread.setDaemon(True)
pullThread.start()
return pullThread
@@ -1393,7 +1416,7 @@ class ScreenRecordingProcess(Process):
def recordingFrame(self, cv2tool):
frames = []
status = None
for i in range(self._pullQueue.qsize()):
for i in range(self.pullQueue.qsize()):
frame_result = self.getPullQueue()
if frame_result is None:
continue

+ 38
- 6
concurrency/PullVideoStreamProcess.py View File

@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import os
import time
from multiprocessing import Process, Queue
from os import getpid
@@ -140,10 +141,12 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
image_queue = self._imageQueue
hb_queue = self._hbQueue
try:
base_dir = self._base_dir
# 加载日志框架
init_log(self._base_dir)
init_log(base_dir)
requestId = self._msg.get("request_id")
pull_url = self._msg.get("pull_url")

logger.info("开启视频拉流进程, requestId:{}", requestId)

pull_stream_timeout = int(self._context["service"]["cv2_pull_stream_timeout"])
@@ -169,6 +172,7 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
start_time = time.time()
pull_stream_start_time = time.time()
pull_stream_read_start_time = time.time()
kill_parent_process_timeout = time.time()
concurrent_frame = 1
stop_pull_stream_step = False
while True:
@@ -186,6 +190,7 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
if 'stop_image_hb' == command.get("command"):
putQueue(image_queue, {"command": "stop"}, requestId)
putQueue(hb_queue, {"command": "stop"}, requestId)
clear_pull_p(pull_p, requestId)
imageFileUpload.join(60 * 3)
hb.join(60 * 3)
logger.error("图片线程停止完成, requestId:{}", requestId)
@@ -227,8 +232,20 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
init_pull_num = 1
pull_stream_read_start_time = time.time()
if pull_queue.full():
logger.info("当前视频帧队列处理满队列状态, requestId: {}", requestId)
if not psutil.pid_exists(psutil.Process(getpid()).ppid()):
logger.info("pull拉流队列满了:{}, requestId: {}", os.getppid(), requestId)
# 如果一直有视频流,队列一直是满的,应该是父进程挂了,直接等待60退出
if time.time() - kill_parent_process_timeout > 60:
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", requestId)
putQueue(fb_queue, {"feedback": message_feedback(requestId,
AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_CPU_RESOURCES.value[0],
ExceptionType.NO_CPU_RESOURCES.value[1],
analyse_time=now_date_to_str())},
requestId)
break
# logger.info("当前视频帧队列处理满队列状态, requestId: {}", requestId)
if psutil.Process(getpid()).ppid() == 1:
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", requestId)
putQueue(fb_queue, {"feedback": message_feedback(requestId,
AnalysisStatus.FAILED.value,
@@ -239,6 +256,7 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
requestId)
break
continue
kill_parent_process_timeout = time.time()
putQueue(pull_queue, ("4", frame, concurrent_frame, w_2, h_2, all_frames), requestId)
concurrent_frame += 1
except ServiceException as s:
@@ -272,8 +290,8 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
image_queue = self._imageQueue
hb_queue = self._hbQueue
try:
# 加载日志框架
init_log(self._base_dir)
base_dir = self._base_dir
init_log(base_dir)
requestId = self._msg.get("request_id")
pull_url = self._msg.get("original_url")
logger.info("开启离线视频拉流进程, requestId:{}", requestId)
@@ -289,6 +307,7 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
start_time = time.time()
concurrent_frame = 1
stop_pull_stream_step = False
kill_parent_process_timeout = time.time()
width, height, width_height_3, all_frames, w_2, h_2 = build_video_info(pull_url, requestId)
while True:
check(start_time, service_timeout, requestId, imageFileUpload, hb)
@@ -302,6 +321,7 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
if 'stop_image_hb' == command.get("command"):
putQueue(image_queue, {"command": "stop"}, requestId)
putQueue(hb_queue, {"command": "stop"}, requestId)
clear_pull_p(pull_p, requestId)
imageFileUpload.join(60 * 3)
hb.join(60 * 3)
logger.error("图片线程停止完成, requestId:{}", requestId)
@@ -311,7 +331,18 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
continue
if pull_queue.full():
logger.info("当前视频帧队列处理满队列状态, requestId: {}", requestId)
if not psutil.pid_exists(psutil.Process(getpid()).ppid()):
# 如果一直有视频流,队列一直是满的,应该是父进程挂了,直接等待60退出
if time.time() - kill_parent_process_timeout > 60:
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", requestId)
putQueue(fb_queue, {"feedback": message_feedback(requestId,
AnalysisStatus.FAILED.value,
self._analyse_type,
ExceptionType.NO_CPU_RESOURCES.value[0],
ExceptionType.NO_CPU_RESOURCES.value[1],
analyse_time=now_date_to_str())},
requestId)
break
if psutil.Process(getpid()).ppid() == 1:
logger.info("检测到父进程异常停止, 请检测服务器资源是否负载过高, requestId: {}", requestId)
putQueue(fb_queue, {"feedback": message_feedback(requestId,
AnalysisStatus.FAILED.value,
@@ -322,6 +353,7 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
requestId)
break
continue
kill_parent_process_timeout = time.time()
check_vide_result = check_video_stream(width, height)
if check_vide_result:
logger.info("开始重新获取视频信息: {}次, requestId: {}", cv2_init_num, requestId)

+ 20
- 2
service/Dispatcher.py View File

@@ -99,11 +99,29 @@ class DispatcherService:
logger.info("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙")
# 循环消息处理
start_time = time.time()
persistent_time = time.time()
full_count = 0
while True:
try:
# 检查任务进程运行情况,去除活动的任务
self.check_process_task()
start_time = self.check_service_resource(start_time)
if len(self.__listeningProcesses) > 0:
now = time.time()
requestIds = list(self.__listeningProcesses.keys())
requestId = requestIds[-1]
task_process = self.__listeningProcesses.get(requestId)
end_time = now - task_process.start_proccess_time
if end_time > 80 and task_process.pullQueue.full() and time.time() - persistent_time < 10:
full_count += 1
if full_count > 2:
logger.error("服务器资源限制, 暂无资源可以使用! requestId:{}", requestId)
task_process.sendEvent({"command": "stop_ex"})
full_count = 0
persistent_time = time.time()
if end_time > 80 and task_process.pullQueue.full() and time.time() - persistent_time >= 10:
full_count = 0
persistent_time = time.time()
self.start_feedback_thread()
msg = customerKafkaConsumer.poll()
time.sleep(1)
@@ -331,7 +349,7 @@ class DispatcherService:

def online(self, message, analysisType):
if "start" == message.get("command"):
if self.__resource_status:
if self.__resource_status or len(self.__listeningProcesses) >= 5:
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startOnlineProcess(message, analysisType)
@@ -342,7 +360,7 @@ class DispatcherService:

def offline(self, message, analysisType):
if "start" == message.get("command"):
if self.__resource_status:
if self.__resource_status or len(self.__listeningProcesses) >= 5:
raise ServiceException(ExceptionType.NO_RESOURCES.value[0],
ExceptionType.NO_RESOURCES.value[1])
self.startOfflineProcess(message, analysisType)

+ 195
- 159
test/内存优化/slots/test1.py View File

@@ -1,164 +1,200 @@
# import timeit
# class MyClass:
# @staticmethod
# def my_static_method():
# pass
# def my_function():
# pass
# # 通过类名调用静态方法
# def test_static_method():
# MyClass.my_static_method()
# # 通过函数名调用函数方法
# def test_function():
# my_function()
# # 测试执行速度
# print("static method: ", timeit.timeit(test_static_method, number=10000000))
# print("function: ", timeit.timeit(test_function, number=10000000))
import copy
import os
import pickle
import time
import timeit
import traceback
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Queue
from threading import Thread

import cv2
import psutil

# def mu():
# aa ={
# 'half': 1,
# 'iou_thres': (1,2,3),
# 'allowedList': [1,3],
# 'aa': {"1":"2"},
# }
# bb = aa.copy()
# def mu1():
# aa ={
# 'half': 1,
# 'iou_thres': (1,2,3),
# 'allowedList': [1,3],
# 'aa': {"1":"2"},
# }
# # copy.deepcopy(aa)
# print(pickle.loads(pickle.dumps(aa)))
# mu1()
# print("static method1: ", timeit.timeit(mu, number=100000))
# print("static method2: ", timeit.timeit(mu1, number=100000))


# def aa():
# Point = namedtuple('Point', ('x', 'y', 'z'))
# aa = Point("1111111111", "22222222222", "333333333333")
# aa.x
# aa.y
# aa.z
# def aa1():
# aa = ["1111111111", "22222222222", "333333333333"]
# aa[0]
# aa[1]
# aa[2]
# print("static method1: ", timeit.timeit(aa, number=100000))
# print("static method2: ", timeit.timeit(aa1, number=100000))
# aa=[]
# aa[28] = "11111"
# print(aa)
# def aa1():
# # 获取当前进程的父进程的PID
# if psutil.pid_exists(psutil.Process(os.getpid()).ppid()):
# print("11111")
# else:
# print("2222")
# print("static method2: ", timeit.timeit(aa1, number=100))
# def aa():
# num=1
# while num < 100:
# num+=1
# time.sleep(100)
# dd = Thread(target=aa, args=())
# dd.setDaemon(True)
# dd.start()
# try:
# dd.join(1)
# except Exception:
# print(traceback.format_exc())
# a = Queue()
# aa = {"aa": "11"}
# a.put(aa)
# bb = a.get()
# bb["aa"] = "2222"
# print(aa)
# print(bb)


# def dd():
# w = ["1111", "2222", "3333"]
# a = w[2]
# # # import timeit
# # # class MyClass:
# # # @staticmethod
# # # def my_static_method():
# # # pass
# # # def my_function():
# # # pass
# # # # 通过类名调用静态方法
# # # def test_static_method():
# # # MyClass.my_static_method()
# # # # 通过函数名调用函数方法
# # # def test_function():
# # # my_function()
# # # # 测试执行速度
# # # print("static method: ", timeit.timeit(test_static_method, number=10000000))
# # # print("function: ", timeit.timeit(test_function, number=10000000))
# # import copy
# # import os
# # import pickle
# # import time
# # import timeit
# # import traceback
# # from collections import namedtuple
# # from concurrent.futures import ThreadPoolExecutor
# # from multiprocessing import Queue
# # from threading import Thread
# #
# # import cv2
# # import psutil
# #
# # # def mu():
# # # aa ={
# # # 'half': 1,
# # # 'iou_thres': (1,2,3),
# # # 'allowedList': [1,3],
# # # 'aa': {"1":"2"},
# # # }
# # # bb = aa.copy()
# # # def mu1():
# # # aa ={
# # # 'half': 1,
# # # 'iou_thres': (1,2,3),
# # # 'allowedList': [1,3],
# # # 'aa': {"1":"2"},
# # # }
# # # # copy.deepcopy(aa)
# # # print(pickle.loads(pickle.dumps(aa)))
# # # mu1()
# # # print("static method1: ", timeit.timeit(mu, number=100000))
# # # print("static method2: ", timeit.timeit(mu1, number=100000))
# #
# #
# # # def aa():
# # # Point = namedtuple('Point', ('x', 'y', 'z'))
# # # aa = Point("1111111111", "22222222222", "333333333333")
# # # aa.x
# # # aa.y
# # # aa.z
# # # def aa1():
# # # aa = ["1111111111", "22222222222", "333333333333"]
# # # aa[0]
# # # aa[1]
# # # aa[2]
# # # print("static method1: ", timeit.timeit(aa, number=100000))
# # # print("static method2: ", timeit.timeit(aa1, number=100000))
# # # aa=[]
# # # aa[28] = "11111"
# # # print(aa)
# # # def aa1():
# # # # 获取当前进程的父进程的PID
# # # if psutil.pid_exists(psutil.Process(os.getpid()).ppid()):
# # # print("11111")
# # # else:
# # # print("2222")
# # # print("static method2: ", timeit.timeit(aa1, number=100))
# # # def aa():
# # # num=1
# # # while num < 100:
# # # num+=1
# # # time.sleep(100)
# # # dd = Thread(target=aa, args=())
# # # dd.setDaemon(True)
# # # dd.start()
# # # try:
# # # dd.join(1)
# # # except Exception:
# # # print(traceback.format_exc())
# # # a = Queue()
# # # aa = {"aa": "11"}
# # # a.put(aa)
# # # bb = a.get()
# # # bb["aa"] = "2222"
# # # print(aa)
# # # print(bb)
# #
# #
# # # def dd():
# # # w = ["1111", "2222", "3333"]
# # # a = w[2]
# # #
# # #
# # # def cc():
# # # aa = {"aa": "1111", "bb": "2222", "cc": "3333"}
# # # t= aa["cc"]
# # #
# # # print("static method1: ", timeit.timeit(dd, number=1000))
# # # print("static method2: ", timeit.timeit(cc, number=1000))
# # import numpy as np
# #
# #
# # # # 创建一个numpy数组
# # # arr1 = np.array([[[1,2,3]], [[2,3, 4]]])
# # # # 使用copy()方法深度拷贝数组
# # # arr2 = arr1.copy()
# # # # 修改arr2中的元素
# # # arr2[0][0][1] = 5
# # # # 打印arr1和arr2
# # # print("arr1:", arr1)
# # # print("arr2:", arr2)
# # # def cop2():
# # # arr1 = np.array([[1, 2], [3, 4]])
# # # arr2 = arr1
# # # def cop():
# # # arr1 = np.array([[1, 2], [3, 4]])
# # # arr2 = arr1.copy()
# # # def cop1():
# # # arr1 = np.array([[1, 2], [3, 4]])
# # # arr2 = copy.deepcopy(arr1)
# # # print("static method1: ", timeit.timeit(cop2, number=1000))
# # # print("static method1: ", timeit.timeit(cop, number=1000))
# # # print("static method2: ", timeit.timeit(cop1, number=1000))
# #
# # # aa = {}
# # # def dd(aa):
# # # aa["aa"] = 1
# # #
# # # dd(aa)
# # # print(aa)
# #
# # def aa(num):
# # while True:
# # time.sleep(4)
# # num+= 1
# # raise Exception("1111")
# #
# #
# # def dd():
# # num = 1
# # with ThreadPoolExecutor(max_workers=1) as t:
# # ddd = t.submit(aa, num)
# # while True:
# # ddd.result()
# # time.sleep(1)
# # print(num)
# #
# # image = cv2.imread(r'D:\tuoheng\codenew\tuoheng_alg\image\logo.png')
# # image1 = np.array(image)
# # or_result, or_image = cv2.imencode(".jpg", image1)
# # aa = or_image.tobytes()
# #
# # numpy_array = np.frombuffer(aa, dtype=np.uint8)
# # img_bgr = cv2.cvtColor(numpy_array, cv2.COLOR_RGB2BGR)
# # cv2.sh
# # print(img_bgr)
# import os
# import time
# from os import getpid
#
# import psutil
#
# def cc():
# aa = {"aa": "1111", "bb": "2222", "cc": "3333"}
# t= aa["cc"]
#
# print("static method1: ", timeit.timeit(dd, number=1000))
# print("static method2: ", timeit.timeit(cc, number=1000))
import numpy as np


# # 创建一个numpy数组
# arr1 = np.array([[[1,2,3]], [[2,3, 4]]])
# # 使用copy()方法深度拷贝数组
# arr2 = arr1.copy()
# # 修改arr2中的元素
# arr2[0][0][1] = 5
# # 打印arr1和arr2
# print("arr1:", arr1)
# print("arr2:", arr2)
# def cop2():
# arr1 = np.array([[1, 2], [3, 4]])
# arr2 = arr1
# def cop():
# arr1 = np.array([[1, 2], [3, 4]])
# arr2 = arr1.copy()
# def cop1():
# arr1 = np.array([[1, 2], [3, 4]])
# arr2 = copy.deepcopy(arr1)
# print("static method1: ", timeit.timeit(cop2, number=1000))
# print("static method1: ", timeit.timeit(cop, number=1000))
# print("static method2: ", timeit.timeit(cop1, number=1000))

# aa = {}
# def dd(aa):
# aa["aa"] = 1
# from multiprocessing import Process
#
# dd(aa)
# print(aa)

def aa(num):
while True:
time.sleep(4)
num+= 1
raise Exception("1111")


def dd():
num = 1
with ThreadPoolExecutor(max_workers=1) as t:
ddd = t.submit(aa, num)
while True:
ddd.result()
time.sleep(1)
print(num)

image = cv2.imread(r'D:\tuoheng\codenew\tuoheng_alg\image\logo.png')
image1 = np.array(image)
or_result, or_image = cv2.imencode(".jpg", image1)
aa = or_image.tobytes()
# def fun2(name, pid):
# while True:
# time.sleep(1)
# print("fun2", getpid(), os.getppid(), psutil.Process(getpid()).ppid(), psutil.pid_exists(psutil.Process(getpid()).ppid()))
#
# def fun1(name):
# print('测试%s多进程' %name)
# p = Process(target=fun2,args=('Python',getpid(),)) #实例化进程对象
# # p.daemon = True
# p.start()
# print("funn1", getpid(), p.pid)
#
#
#
# if __name__ == '__main__':
# p = Process(target=fun1,args=('Python',)) #实例化进程对象
# # p.daemon = True
# p.start()
#
#
# print('结束测试', os.getpid())
#
listeningProcesses={"11111": "1111", "2222": "2222"}

numpy_array = np.frombuffer(aa, dtype=np.uint8)
img_bgr = cv2.cvtColor(numpy_array, cv2.COLOR_RGB2BGR)
cv2.sh
print(img_bgr)
p = list(listeningProcesses.keys())
p.reverse()
print(p)

+ 1
- 1
util/CpuUtils.py View File

@@ -13,7 +13,7 @@ def check_cpu(base_dir, requestId=None):
cpu_mem = psutil.virtual_memory().percent
cpu_swap = psutil.swap_memory().percent
cpu_disk = psutil.disk_usage(path).percent
if float(cpu_use) > 65 or float(cpu_mem) > 70 or cpu_swap > 85 or cpu_disk > 90:
if float(cpu_use) > 60 or float(cpu_mem) > 80 or cpu_swap > 85 or cpu_disk > 90:
if requestId:
logger.info("""###############################################################################################
CPU 使用率:{}, 内存使用:{}, SWAP内存使用率:{}, 服务磁盘使用率:{}, requestId:{}

Loading…
Cancel
Save