Browse Source

dsp支持图片和视频优化代码提交

tags/V2.4.0
chenyukun 1 year ago
parent
commit
cbb6414e80
15 changed files with 400 additions and 274 deletions
  1. +21
    -0
      .idea/deployment.xml
  2. +6
    -0
      .idea/encodings.xml
  3. +1
    -0
      .idea/inspectionProfiles/Project_Default.xml
  4. +58
    -59
      .idea/workspace.xml
  5. +104
    -43
      concurrency/IntelligentRecognitionProcess.py
  6. +9
    -6
      concurrency/PullVideoStreamProcess.py
  7. +6
    -0
      enums/ExceptionEnum.py
  8. +65
    -45
      service/Dispatcher.py
  9. +2
    -1
      test/ffmpeg11/ffmpeg11.py
  10. +94
    -94
      test/kafka/producer_start.py
  11. +2
    -2
      test/kafka/producer_stop.py
  12. +0
    -0
      test/while/__init__.py
  13. +9
    -0
      test/while/test.py
  14. +15
    -20
      util/Cv2Utils.py
  15. +8
    -4
      util/ImageUtils.py

+ 21
- 0
.idea/deployment.xml View File

@@ -18,6 +18,13 @@
</mappings>
</serverdata>
</paths>
<paths name="dell@192.168.10.12:22">
<serverdata>
<mappings>
<mapping deploy="/home/chenyukun/algSch" local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="root@212.129.223.66:20653">
<serverdata>
<mappings>
@@ -39,6 +46,20 @@
</mappings>
</serverdata>
</paths>
<paths name="thsw@192.168.10.11:22">
<serverdata>
<mappings>
<mapping deploy="/home/thsw/chenyukun/algSch" local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
<paths name="thsw@212.129.223.66:6000">
<serverdata>
<mappings>
<mapping deploy="/home/thsw/chenyukun/algSch" local="$PROJECT_DIR$" web="/" />
</mappings>
</serverdata>
</paths>
</serverData>
<option name="myAutoUpload" value="ALWAYS" />
</component>

+ 6
- 0
.idea/encodings.xml View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="PROJECT" charset="UTF-8" />
</component>
</project>

+ 1
- 0
.idea/inspectionProfiles/Project_Default.xml View File

@@ -37,6 +37,7 @@
<list>
<option value="N806" />
<option value="N803" />
<option value="N802" />
</list>
</option>
</inspection_tool>

+ 58
- 59
.idea/workspace.xml View File

@@ -5,29 +5,18 @@
</component>
<component name="ChangeListManager">
<list default="true" id="4f7dccd9-8f92-4a6e-90cc-33890d102263" name="Changes" comment="Changes">
<change afterPath="$PROJECT_DIR$/.idea/runConfigurations.xml" afterDir="false" />
<change afterPath="$PROJECT_DIR$/test/ffmpeg11/ffmpeg33.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/deployment.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/deployment.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/inspectionProfiles/Project_Default.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/inspectionProfiles/Project_Default.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/MessagePollingThread.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/MessagePollingThread.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dsp_application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/dsp_application.yml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/dsp_master.py" beforeDir="false" afterPath="$PROJECT_DIR$/dsp_master.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/PullVideoStreamProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/PullVideoStreamProcess.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/enums/ExceptionEnum.py" beforeDir="false" afterPath="$PROJECT_DIR$/enums/ExceptionEnum.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/service/Dispatcher.py" beforeDir="false" afterPath="$PROJECT_DIR$/service/Dispatcher.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/AI.jpg" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/test/ORB算法.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/ORB算法.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/a.jpg" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/test/b.jpg" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/test/d.jpg" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/test/互信息.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/互信息.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/余弦相似度计算.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/余弦相似度计算.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/视频添加图片水印1.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/视频添加图片水印1.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/视频添加文字水印.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/视频添加文字水印.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/视频添加文字水印1.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/视频添加文字水印1.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/视频添加文字水印2.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/视频添加文字水印2.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/视频添加文字水印3.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/水印/视频添加文字水印3.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/ffmpeg11/ffmpeg11.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/ffmpeg11/ffmpeg11.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/kafka/producer_start.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/kafka/producer_start.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/kafka/producer_stop.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/kafka/producer_stop.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/Cv2Utils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/Cv2Utils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/ModelUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/ModelUtils.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/ImageUtils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/ImageUtils.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -106,12 +95,12 @@
<option name="branches">
<list>
<RecentBranch>
<option name="branchName" value="develop" />
<option name="lastUsedInstant" value="1663555162" />
<option name="branchName" value="release" />
<option name="lastUsedInstant" value="1668475995" />
</RecentBranch>
<RecentBranch>
<option name="branchName" value="release" />
<option name="lastUsedInstant" value="1663555154" />
<option name="branchName" value="develop" />
<option name="lastUsedInstant" value="1663555162" />
</RecentBranch>
<RecentBranch>
<option name="branchName" value="master" />
@@ -157,7 +146,7 @@
<property name="WebServerToolWindowPanel.toolwindow.show.date" value="false" />
<property name="WebServerToolWindowPanel.toolwindow.show.permissions" value="false" />
<property name="WebServerToolWindowPanel.toolwindow.show.size" value="false" />
<property name="last_opened_file_path" value="$PROJECT_DIR$" />
<property name="last_opened_file_path" value="$PROJECT_DIR$/../../fangke/tuoheng_fk_python" />
<property name="node.js.detected.package.eslint" value="true" />
<property name="node.js.detected.package.tslint" value="true" />
<property name="node.js.selected.package.eslint" value="(autodetect)" />
@@ -181,7 +170,7 @@
<recent name="D:\work\alg\tuoheng_alg\image" />
</key>
</component>
<component name="RunManager" selected="Python.ffmpeg33">
<component name="RunManager" selected="Python.test">
<configuration name="ImageUtils" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
@@ -204,20 +193,20 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="KafkaUtils (1)" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="ffmpeg33" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/util" />
<option name="IS_MODULE_SDK" value="true" />
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/ffmpeg11" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/util/KafkaUtils.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/ffmpeg11/ffmpeg33.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@@ -226,20 +215,20 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="KafkaUtils" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="mysqltest" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="sftp://root@212.129.223.66:20653/opt/conda/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/util" />
<option name="IS_MODULE_SDK" value="false" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="/home/DATA/chenyukun/algSch/util/KafkaUtils.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/mysqltest.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@@ -248,20 +237,20 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="ffmpeg33" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="producer_start" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/ffmpeg11" />
<option name="SDK_HOME" value="sftp://thsw@212.129.223.66:6000/home/thsw/anaconda3/envs/chenyukun/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/home/thsw/chenyukun/algSch" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/ffmpeg11/ffmpeg33.py" />
<option name="SCRIPT_NAME" value="/home/thsw/chenyukun/algSch/test/kafka/producer_start.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@@ -270,20 +259,20 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="mysqltest" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<configuration name="producer_stop" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="IS_MODULE_SDK" value="true" />
<option name="SDK_HOME" value="sftp://thsw@212.129.223.66:6000/home/thsw/anaconda3/envs/chenyukun/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/home/thsw/chenyukun/algSch" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/mysqltest.py" />
<option name="SCRIPT_NAME" value="/home/thsw/chenyukun/algSch/test/kafka/producer_stop.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@@ -292,20 +281,20 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="producer_start" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="test" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="sftp://root@212.129.223.66:20653/opt/conda/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/while" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="/home/DATA/chenyukun/algSch/test/producer_start.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/while/test.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@@ -316,19 +305,19 @@
</configuration>
<list>
<item itemvalue="Python.mysqltest" />
<item itemvalue="Python.KafkaUtils" />
<item itemvalue="Python.KafkaUtils (1)" />
<item itemvalue="Python.producer_start" />
<item itemvalue="Python.ImageUtils" />
<item itemvalue="Python.ffmpeg33" />
<item itemvalue="Python.producer_stop" />
<item itemvalue="Python.test" />
</list>
<recent_temporary>
<list>
<item itemvalue="Python.test" />
<item itemvalue="Python.producer_stop" />
<item itemvalue="Python.producer_start" />
<item itemvalue="Python.ffmpeg33" />
<item itemvalue="Python.ImageUtils" />
<item itemvalue="Python.producer_start" />
<item itemvalue="Python.KafkaUtils (1)" />
<item itemvalue="Python.KafkaUtils" />
</list>
</recent_temporary>
</component>
@@ -378,6 +367,7 @@
<workItem from="1666436589395" duration="2588000" />
<workItem from="1666568450522" duration="695000" />
<workItem from="1666658084006" duration="458000" />
<workItem from="1668557891343" duration="19808000" />
</task>
<servers />
</component>
@@ -417,25 +407,34 @@
<select />
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/tuoheng_alg$KafkaUtils__1_.coverage" NAME="KafkaUtils (1) Coverage Results" MODIFIED="1663464961001" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg$cv2test1__1_.coverage" NAME="cv2test1 覆盖结果" MODIFIED="1665820653649" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$mysqltest.coverage" NAME="mysqltest Coverage Results" MODIFIED="1660868712851" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$ffmpeg33.coverage" NAME="ffmpeg33 覆盖结果" MODIFIED="1666185710677" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/ffmpeg11" />
<SUITE FILE_PATH="coverage/tuoheng_alg$asnyc.coverage" NAME="asnyc Coverage Results" MODIFIED="1663459033435" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$.coverage" NAME="视频添加图片水印 Coverage Results" MODIFIED="1661873949526" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$1.coverage" NAME="视频添加文字水印1 Coverage Results" MODIFIED="1663381948693" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$asnyc__1_.coverage" NAME="asnyc (1) Coverage Results" MODIFIED="1663458917599" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$producer_start.coverage" NAME="producer_start 覆盖结果" MODIFIED="1665896605019" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/DATA/chenyukun/algSch/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$1.coverage" NAME="协程1 覆盖结果" MODIFIED="1667866542122" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/协程" />
<SUITE FILE_PATH="coverage/tuoheng_alg$.coverage" NAME="字典 覆盖结果" MODIFIED="1668089121018" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/字典" />
<SUITE FILE_PATH="coverage/tuoheng_alg$producer_start__1_.coverage" NAME="producer_start (1) 覆盖结果" MODIFIED="1665832569996" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$dsp_master.coverage" NAME="dsp_master 覆盖结果" MODIFIED="1665892172353" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/DATA/chenyukun/algSch" />
<SUITE FILE_PATH="coverage/tuoheng_alg$5.coverage" NAME="视频添加图片水印5 Coverage Results" MODIFIED="1661905982885" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$KafkaUtils.coverage" NAME="KafkaUtils Coverage Results" MODIFIED="1663465345491" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg$aa1.coverage" NAME="aa1 覆盖结果" MODIFIED="1667351136888" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/ffmpeg11" />
<SUITE FILE_PATH="coverage/tuoheng_alg$2.coverage" NAME="协程2 覆盖结果" MODIFIED="1668066168428" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/opt/tuo_heng/algSch/test/协程/" />
<SUITE FILE_PATH="coverage/tuoheng_alg$4.coverage" NAME="视频添加图片水印4 Coverage Results" MODIFIED="1661874731395" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$producer_start1.coverage" NAME="producer_start1 覆盖结果" MODIFIED="1668437822632" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch/test/kafka" />
<SUITE FILE_PATH="coverage/tuoheng_alg$KafkaUtils__1_.coverage" NAME="KafkaUtils (1) Coverage Results" MODIFIED="1663464961001" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg$minio.coverage" NAME="minio 覆盖结果" MODIFIED="1667465702864" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/minio1" />
<SUITE FILE_PATH="coverage/tuoheng_alg$producer_start.coverage" NAME="producer_start Coverage Results" MODIFIED="1663466832843" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$producer_start.coverage" NAME="producer_start 覆盖结果" MODIFIED="1668522825199" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch" />
<SUITE FILE_PATH="coverage/tuoheng_alg$dsp_master.coverage" NAME="dsp_master Coverage Results" MODIFIED="1663403978477" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$ffmpeg11.coverage" NAME="ffmpeg11 覆盖结果" MODIFIED="1668410004435" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/ffmpeg11" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$3.coverage" NAME="协程3 覆盖结果" MODIFIED="1668147029048" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/协程" />
<SUITE FILE_PATH="coverage/tuoheng_alg$read.coverage" NAME="read Coverage Results" MODIFIED="1663640070956" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$ffmpeg22.coverage" NAME="aa 覆盖结果" MODIFIED="1667350492259" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/opt/tuo_heng" />
<SUITE FILE_PATH="coverage/tuoheng_alg$cv2test1.coverage" NAME="cv2test1 覆盖结果" MODIFIED="1665738045603" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/DATA/chenyukun/algSch/test/" />
<SUITE FILE_PATH="coverage/tuoheng_alg$ImageUtils.coverage" NAME="ImageUtils Coverage Results" MODIFIED="1663499421253" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg$2.coverage" NAME="视频添加图片水印2 Coverage Results" MODIFIED="1661875306428" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$4.coverage" NAME="视频添加图片水印4 Coverage Results" MODIFIED="1661874731395" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$TimeUtils.coverage" NAME="TimeUtils Coverage Results" MODIFIED="1661222768678" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$producer_stop.coverage" NAME="producer_stop 覆盖结果" MODIFIED="1668522920533" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$test.coverage" NAME="test 覆盖结果" MODIFIED="1668577200259" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/while" />
<SUITE FILE_PATH="coverage/tuoheng_alg$3.coverage" NAME="视频添加文字水印3 Coverage Results" MODIFIED="1661906152928" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
</component>
</project>

+ 104
- 43
concurrency/IntelligentRecognitionProcess.py View File

@@ -47,6 +47,25 @@ class IntelligentRecognitionProcess(Process):
self.picture_similarity = self.content["service"]["filter"]["picture_similarity"]
self.similarity = self.content["service"]["filter"]["similarity"]


def clearPullQueue(self):
while True:
if self.pullQueue.qsize() > 0 or self.imageQueue.qsize() > 0:
self.getPullQueue()
self.getImageQueue()
else:
break

def waitPullStream(self, pullProcess):
while True:
self.clearPullQueue()
start = time.time()
pullProcess.join(5)
if time.time() - start >= 5:
self.clearPullQueue()
else:
break

# 给本进程发送事件
def sendEvent(self, eBody):
self.eventQueue.put(eBody)
@@ -70,6 +89,15 @@ class IntelligentRecognitionProcess(Process):
pass
return eBody

def getImageQueue(self):
eBody = None
try:
eBody = self.imageQueue.get(block=False)
return eBody
except Exception as e:
pass
return eBody

# 推送执行结果
def sendResult(self, result):
self.fbQueue.put(result)
@@ -114,12 +142,10 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):

# 停止任务方法
def stop_task(self, cv2tool, pullProcess, snalysisStatus):
# 停止cv2相关配置
cv2tool.close()
pullProcess.sendCommand({"command": "stop_ex"})
if not os.path.exists(self.orFilePath) or not os.path.exists(self.aiFilePath):
logger.error("原视频或AI视频不存在!requestId:{}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[1])
hb = Heartbeat(self.fbQueue, self.msg.get("request_id"), AnalysisType.ONLINE.value)
@@ -140,8 +166,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
ai_play_url, self.msg.get("request_id"))
raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
self.waitPullStream(pullProcess)
hb.sendHbQueue({"command": "stop"})
hb.join(60 * 3)
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), snalysisStatus,
@@ -157,7 +182,6 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
pullProcess = None
loop = None
try:

# 程序开始时间
LogUtils.init_log(self.content)
self.sendhbMessage(AnalysisStatus.WAITING.value, "0.0000", AnalysisType.ONLINE.value)
@@ -174,36 +198,51 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pullProcess_timeout = None
task_frame = None
self.sendhbMessage(AnalysisStatus.RUNNING.value, "0.0000", AnalysisType.ONLINE.value)
with ThreadPoolExecutor(max_workers=6) as t:
while True:
if not pullProcess.is_alive():
if pullProcess_timeout is None:
pullProcess_timeout = time.time()
if time.time() - pullProcess_timeout > 300:
logger.info("拉流进程停止异常, requestId: {}", self.msg.get("request_id"))
raise Exception("拉流进程异常停止")
logger.info("拉流进程停止异常, requestId: {}", self.msg.get("request_id"))
raise Exception("拉流进程异常停止")
eBody = self.getEvent()
if eBody is not None and len(eBody) > 0:
cmdStr = eBody.get("command")
# 接收到停止指令
if 'stop' == cmdStr:
logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_pull_stream"})
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务执行结束指令1:requestId: {}", self.msg.get("request_id"))
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break
frames = []
status = None
if task_frame is not None:
frames, status = task_frame.result()
task_frame = t.submit(buildFrame, self.pullQueue, cv2tool, mod, self.content, self.pic)
if len(frames) == 0 and status is None:
time.sleep(0.02)
continue
if frames is not None and len(frames) > 0:
for result in t.map(process, frames):
if result is not None:
p_result, frame_all, frame_merge = result
task = loop.create_task(cv2tool.push_stream(frame_merge))
task1 = loop.create_task(cv2tool.video_write(frame_all[0].get("frame"), frame_merge))
loop.run_until_complete(asyncio.wait([task, task1]))
r = loop.run_until_complete(asyncio.gather(cv2tool.push_stream(frame_merge),
cv2tool.video_write(frame_all[0].get("frame"), frame_merge)))
if r[0] is None or not r[0]:
t.shutdown(wait=False)
loop.close()
raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
if r[1] is None or not r[1]:
t.shutdown(wait=False)
loop.close()
raise ServiceException(ExceptionType.WRITE_STREAM_EXCEPTION.value[0],
ExceptionType.WRITE_STREAM_EXCEPTION.value[1])
if frame_all[0].get("cct_frame") % 400 == 0:
self.sendhbMessage(AnalysisStatus.RUNNING.value, '', AnalysisType.ONLINE.value)
# # 问题图片加入队列, 暂时写死,后期修改为真实问题
@@ -255,20 +294,21 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3)
t.shutdown(wait=False)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break
if status is None:
continue
if status.get("status") == "1":
t.shutdown(wait=False)
raise ServiceException(status.get("error").get("code"), status.get("error").get("msg"))
elif status.get("status") == "3":
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break
elif status.get("status") == "9":
@@ -276,10 +316,12 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break
else:
raise Exception("未知拉流状态异常!")
logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id"))
except ServiceException as s:
logger.error("服务异常,异常编号:{}, 异常描述:{}, requestId: {}", s.code, s.msg, self.msg.get("request_id"))
@@ -302,7 +344,7 @@ class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
loop.close()
if pullProcess is not None and pullProcess.is_alive():
pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3)
self.waitPullStream(pullProcess)
if feedback:
self.sendResult(feedback)
# 删除本地视频文件
@@ -354,6 +396,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):

def stop_task(self, cv2tool, pullProcess, analysisStatus):
cv2tool.close()
pullProcess.sendCommand({"command": "stop_ex"})
if not os.path.exists(self.aiFilePath):
logger.error("AI视频不存在!requestId:{}", self.msg.get("request_id"))
raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
@@ -371,8 +414,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
self.msg.get("request_id"), ai_play_url)
raise ServiceException(ExceptionType.GET_VIDEO_URL_EXCEPTION.value[0],
ExceptionType.GET_VIDEO_URL_EXCEPTION.value[1])
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(180)
self.waitPullStream(pullProcess)
hb.sendHbQueue({"command": "stop"})
hb.join(180)
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), analysisStatus,
@@ -400,7 +442,6 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):

high_score_image = {}
# 当前帧数

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
pullProcess_timeout = None
@@ -408,7 +449,6 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
task_frame = None
with ThreadPoolExecutor(max_workers=6) as t:
while True:
start = time.time()
if not pullProcess.is_alive():
if pullProcess_timeout is None:
pullProcess_timeout = time.time()
@@ -421,20 +461,41 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
cmdStr = eBody.get("command")
if 'stop' == cmdStr:
logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_pull_stream"})
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务执行结束指令2:requestId: {}", self.msg.get("request_id"))
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break

frames = []
status = None
if task_frame is not None:
frames, status = task_frame.result()
task_frame = t.submit(buildFrame, self.pullQueue, cv2tool, mod, self.content, self.pic)
logger.info("帧数:{}, requestId: {}", len(frames), self.msg.get("request_id"))
if len(frames) == 0 and status is None:
time.sleep(0.02)
continue
if len(frames) > 0:
for result in t.map(process, frames):
if result is not None:
p_result, frame_all, frame_merge = result
task = loop.create_task(cv2tool.push_stream(frame_merge))
task1 = loop.create_task(cv2tool.video_write(None, frame_merge))
loop.run_until_complete(asyncio.wait([task, task1]))
# task = loop.create_task(cv2tool.push_stream(frame_merge))
# task1 = loop.create_task(cv2tool.video_write(None, frame_merge))
r = loop.run_until_complete(asyncio.gather(cv2tool.push_stream(frame_merge),
cv2tool.video_write(None, frame_merge)))
if r[0] is None or not r[0]:
t.shutdown(wait=False)
loop.close()
raise ServiceException(ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[0],
ExceptionType.PUSH_STREAMING_CHANNEL_IS_OCCUPIED.value[1])
if r[1] is None or not r[1]:
t.shutdown(wait=False)
loop.close()
raise ServiceException(ExceptionType.WRITE_STREAM_EXCEPTION.value[0],
ExceptionType.WRITE_STREAM_EXCEPTION.value[1])
if frame_all[0].get("cct_frame") % 600 == 0:
task_process = str(format(
float(frame_all[0].get("cct_frame")) / float(frame_all[0].get("all_frame")),
@@ -489,30 +550,30 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3)
t.shutdown(wait=False)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break

logger.info("执行时间1111: {}, 队列大小:{}, requestId: {}", time.time() - start, self.pullQueue.qsize(), self.msg.get("request_id"))
if status is None:
continue
if status.get("status") == "1":
t.shutdown(wait=True)
loop.close()
raise ServiceException(status.get("error").get("code"), status.get("error").get("msg"))
elif status.get("status") == "2":
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break
elif status.get("status") == "3":
if len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break
elif status.get("status") == "9":
@@ -520,8 +581,8 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3)
t.shutdown(wait=True)
loop.close()
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break
logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id"))
@@ -546,7 +607,7 @@ class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):
loop.close()
if pullProcess is not None and pullProcess.is_alive():
pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3)
self.waitPullStream(pullProcess)
if feedback is not None:
self.sendResult(feedback)
# 删除本地视频文件

+ 9
- 6
concurrency/PullVideoStreamProcess.py View File

@@ -80,15 +80,17 @@ class OnlinePullVideoStreamProcess(PullVideoStreamProcess):
cv2tool.close()
continue
if 'stop_image' == body.get("command"):
time.sleep(5)
self.sendImageResult({"command": "stop"})
imageFileUpdate.join(60*3)
logger.error("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
break
if 'stop_ex' == body.get("command"):
time.sleep(5)
self.sendImageResult({"command": "stop"})
imageFileUpdate.join(60*3)
self.pullQueue.cancel_join_thread()
logger.error("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
# self.pullQueue.cancel_join_thread()
logger.error("拉流、图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
break
if stop_imageFile:
time.sleep(1)
@@ -157,7 +159,7 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
cv2tool = None
imageFileUpdate = None
try:
imageFileUpdate = ImageFileUpdate(self.fbQueue, self.content, self.msg, self.imageQueue, AnalysisType.ONLINE.value)
imageFileUpdate = ImageFileUpdate(self.fbQueue, self.content, self.msg, self.imageQueue, AnalysisType.OFFLINE.value)
imageFileUpdate.setDaemon(True)
imageFileUpdate.start()
cv2tool = Cv2Util(pullUrl=self.msg.get('original_url'), requestId=self.msg.get("request_id"))
@@ -186,13 +188,14 @@ class OfflinePullVideoStreamProcess(PullVideoStreamProcess):
if 'stop_image' == body.get("command"):
self.sendImageResult({"command": "stop"})
imageFileUpdate.join(60*3)
logger.error("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
logger.info("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
break
if 'stop_ex' == body.get("command"):
time.sleep(5)
self.sendImageResult({"command": "stop"})
imageFileUpdate.join(60*3)
self.pullQueue.cancel_join_thread()
logger.error("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
# self.pullQueue.cancel_join_thread()
logger.info("图片线程停止完成, reuqestId:{}", self.msg.get("request_id"))
break
if stop_imageFile:
time.sleep(1)

+ 6
- 0
enums/ExceptionEnum.py View File

@@ -55,4 +55,10 @@ class ExceptionType(Enum):

MODEL_ANALYSIS_EXCEPTION = ("SP024", "Model Analysis Exception!")

PUSH_STREAMING_CHANNEL_IS_OCCUPIED = ("SP025", "推流通道被占用, 请稍后再试!")

WRITE_STREAM_EXCEPTION = ("SP026", "视频写流异常, 请联系工程师定位处理!")

NO_GPU_RESOURCES = ("SP998", "暂无GPU资源可以使用,请稍后再试!")

SERVICE_INNER_EXCEPTION = ("SP999", "系统内部异常, 请联系工程师定位处理!")

+ 65
- 45
service/Dispatcher.py View File

@@ -42,9 +42,9 @@ class DispatcherService:
self.image_topic = self.content["kafka"]["topic"]["dsp-alg-image-tasks-topic"]
self.topics = [self.online_topic, self.offline_topic, self.image_topic]
self.analysisType = {
self.online_topic: (AnalysisType.ONLINE.value, lambda x, y: self.online(x, y)),
self.offline_topic: (AnalysisType.OFFLINE.value, lambda x, y: self.offline(x, y)),
self.image_topic: (AnalysisType.IMAGE.value, lambda x, y: self.image(x, y))
self.online_topic: (AnalysisType.ONLINE.value, lambda x: self.online(x)),
self.offline_topic: (AnalysisType.OFFLINE.value, lambda x: self.offline(x)),
self.image_topic: (AnalysisType.IMAGE.value, lambda x: self.image(x))
}

# 服务调用启动方法
@@ -63,47 +63,54 @@ class DispatcherService:
logger.error("======================问题反馈线程异常停止======================")
break
# 获取当前可用gpu使用数量
gpu_ids = GPUtils.get_gpu_ids(self.content)
if gpu_ids is not None and len(gpu_ids) > 0:
msg = customerKafkaConsumer.poll()
if msg is not None and len(msg) > 0:
for k, v in msg.items():
for m in v:
message = m.value
analysisType = self.analysisType.get(m.topic)[0]
try:
customerKafkaConsumer.commit_offset(m)
logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
m.topic, m.offset, m.partition, message, message.get("request_id"))
self.analysisType.get(m.topic)[1](message, gpu_ids)
except ServiceException as s:
logger.exception("消息监听异常:{}, requestId: {}", s.msg, message.get("request_id"))
if analysisType is not None:
feedback = {
"feedback": message_feedback(message.get("request_id"),
AnalysisStatus.FAILED.value,
analysisType,
s.code,
s.msg,
analyse_time=TimeUtils.now_date_to_str())}
self.fbQueue.put(message, feedback)
except Exception as e:
logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
if analysisType is not None:
feedback = {
"feedback": message_feedback(message.get("request_id"),
AnalysisStatus.FAILED.value,
analysisType,
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
analyse_time=TimeUtils.now_date_to_str())}
self.fbQueue.put(message, feedback)
else:
time.sleep(1)
msg = customerKafkaConsumer.poll()
if msg is not None and len(msg) > 0:
for k, v in msg.items():
for m in v:
message = m.value
analysisType = self.analysisType.get(m.topic)[0]
try:
customerKafkaConsumer.commit_offset(m)
logger.info("当前拉取到的消息, topic:{}, offset:{}, partition: {}, body: {}, requestId:{}",
m.topic, m.offset, m.partition, message, message.get("request_id"))
self.analysisType.get(m.topic)[1](message)
except ServiceException as s:
logger.exception("消息监听异常:{}, requestId: {}", s.msg, message.get("request_id"))
if analysisType is not None:
feedback = {
"feedback": message_feedback(message.get("request_id"),
AnalysisStatus.FAILED.value,
analysisType,
s.code,
s.msg,
analyse_time=TimeUtils.now_date_to_str())}
self.fbQueue.put(message, feedback)
except Exception as e:
logger.exception("消息监听异常:{}, requestId: {}", e, message.get("request_id"))
if analysisType is not None:
feedback = {
"feedback": message_feedback(message.get("request_id"),
AnalysisStatus.FAILED.value,
analysisType,
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
analyse_time=TimeUtils.now_date_to_str())}
self.fbQueue.put(message, feedback)
else:
logger.info("当前可用gpu数量: {}", gpu_ids)
GPUtil.showUtilization()
time.sleep(1)

def checkGPU(self, msgId):
gpu_ids = None
while True:
GPUtil.showUtilization()
gpu_ids = GPUtils.get_gpu_ids(self.content)
if gpu_ids is None or len(gpu_ids) == 0:
logger.warning("暂无可用GPU资源,5秒后重试, 可用gpu数: {}, msgId: {}", len(gpu_ids), msgId)
time.sleep(5)
continue
else:
break
return gpu_ids

# 开启实时进程
def startOnlineProcess(self, msg, gpu_ids):
@@ -263,26 +270,35 @@ class DispatcherService:
feedbackThread.start()
return feedbackThread

def online(self, message, gpu_ids):
def online(self, message):

check_result = self.check_online_msg(message)
if not check_result:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
if 'start' == message.get("command"):
logger.info("开始实时分析")
gpu_ids = GPUtils.get_gpu_ids(self.content)
if gpu_ids is None or len(gpu_ids) == 0:
raise ServiceException(ExceptionType.NO_GPU_RESOURCES.value[0],
ExceptionType.NO_GPU_RESOURCES.value[1])
self.startOnlineProcess(message, gpu_ids)
elif 'stop' == message.get("command"):
self.stopOnlineProcess(message)
else:
pass

def offline(self, message, gpu_ids):
def offline(self, message):
check_result = self.check_offline_msg(message)
if not check_result:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
if 'start' == message.get("command"):
logger.info("开始离线分析")
gpu_ids = GPUtils.get_gpu_ids(self.content)
if gpu_ids is None or len(gpu_ids) == 0:
raise ServiceException(ExceptionType.NO_GPU_RESOURCES.value[0],
ExceptionType.NO_GPU_RESOURCES.value[1])
self.startOfflineProcess(message, gpu_ids)
time.sleep(3)
elif 'stop' == message.get("command"):
@@ -290,13 +306,17 @@ class DispatcherService:
else:
pass

def image(self, message, gpu_ids):
def image(self, message):
check_result = self.check_image_msg(message)
if not check_result:
raise ServiceException(ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[0],
ExceptionType.ILLEGAL_PARAMETER_FORMAT.value[1])
if 'start' == message.get("command"):
logger.info("开始图片分析")
gpu_ids = GPUtils.get_gpu_ids(self.content)
if gpu_ids is None or len(gpu_ids) == 0:
raise ServiceException(ExceptionType.NO_GPU_RESOURCES.value[0],
ExceptionType.NO_GPU_RESOURCES.value[1])
self.startImageProcess(message, gpu_ids)
# elif 'stop' == message.get("command"):
# self.stopImageProcess(message)

+ 2
- 1
test/ffmpeg11/ffmpeg11.py View File

@@ -94,7 +94,8 @@ if __name__ == '__main__':
width = int(video_info['width'])
height = int(video_info['height'])
command = ['ffmpeg',
'-vcodec', 'h264_cuvid',
'-hwaccel cuda',
'-c:v', 'h264_cuvid',
'-resize', '1920x1080',
# '-hwaccel_output_format', 'bgr24',
'-i', file_path,

+ 94
- 94
test/kafka/producer_start.py View File

@@ -6,48 +6,48 @@ from kafka import KafkaProducer
import json
import threading

# topicName = 'dsp-alg-online-tasks'
# eBody = {
# "request_id": "d4c909912ac741ce81ccef03fd1b2ec45",
# "models": [
# {
# "code": "001",
# "categories": [{
# "id": "0",
# "config": {}
# },
# {
# "id": "1",
# "config": {}
# },
# {
# "id": "2",
# "config": {}
# },
# {
# "id": "3",
# "config": {}
# },
# {
# "id": "4",
# "config": {}
# },
# {
# "id": "5",
# "config": {}
# }
# ]
# }],
# "command": "start",
# "pull_url": "rtmp://live.play.t-aaron.com/live/THSAh",
# "push_url": "rtmp://live.push.t-aaron.com/live/THSAg",
# "results_base_dir": "P20220802133841159"
# }
# producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
# value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
# result = future.get(timeout=10)
# print(result)
topicName = 'dsp-alg-online-tasks'
eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec45",
"models": [
{
"code": "001",
"categories": [{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
},
{
"id": "4",
"config": {}
},
{
"id": "5",
"config": {}
}
]
}],
"command": "start",
"pull_url": "rtmp://live.play.t-aaron.com/live/THSAr",
"push_url": "rtmp://live.push.t-aaron.com/live/THSAs",
"results_base_dir": "P20220802133841159"
}
producer = KafkaProducer(bootstrap_servers=['192.168.11.242:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
result = future.get(timeout=10)
print(result)

# topicName = 'dsp-alg-image-tasks'
# eBody = {
@@ -95,55 +95,55 @@ import threading
# "https://image.t-aaron.com/P20221112103326614/2022-11-12-10-35-09_frame-1785-2085_20221112103509952824-offline-P20221112103326614-eb4467a3fe8f405ebf4c44f1b48a7e4b_OR.jpg"],
# "results_base_dir": "P20220802133841159"
# }
topicName = 'dsp-alg-offline-tasks'
eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec46",
"models": [
{
"code": "001",
"categories": [
{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
},
{
"id": "4",
"config": {}
},
{
"id": "5",
"config": {}
},
{
"id": "6",
"config": {}
},
{
"id": "7",
"config": {}
}
]
}],
"command": "start",
"original_url": "https://vod.play.t-aaron.com/0bc905ef5651439da2bfba8427fe467e/a76a7ebb6e3b44ef9c0c7820c7e9c574-f2d7ee90cba11aa91971d58e06d295d2-4k.mp4",
"original_type": ".mp4",
"push_url": "rtmp://live.push.t-aaron.com/live/THSAr",
"results_base_dir": "P20220802133841159"
}
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec46', value=eBody)
result = future.get(timeout=10)
print(result)
# topicName = 'dsp-alg-offline-tasks'
# eBody = {
# "request_id": "d4c909912ac741ce81ccef03fd1b2ec46",
# "models": [
# {
# "code": "001",
# "categories": [
# {
# "id": "0",
# "config": {}
# },
# {
# "id": "1",
# "config": {}
# },
# {
# "id": "2",
# "config": {}
# },
# {
# "id": "3",
# "config": {}
# },
# {
# "id": "4",
# "config": {}
# },
# {
# "id": "5",
# "config": {}
# },
# {
# "id": "6",
# "config": {}
# },
# {
# "id": "7",
# "config": {}
# }
# ]
# }],
# "command": "start",
# "original_url": "https://vod.play.t-aaron.com/0bc905ef5651439da2bfba8427fe467e/a76a7ebb6e3b44ef9c0c7820c7e9c574-f2d7ee90cba11aa91971d58e06d295d2-4k.mp4",
# "original_type": ".mp4",
# "push_url": "rtmp://live.push.t-aaron.com/live/THSAr",
# "results_base_dir": "P20220802133841159"
# }
# producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
# value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec46', value=eBody)
# result = future.get(timeout=10)
# print(result)

+ 2
- 2
test/kafka/producer_stop.py View File

@@ -6,12 +6,12 @@ from kafka import KafkaProducer
import json
import threading

topicName = 'dsp-alg-command'
topicName = 'dsp-alg-online-tasks'
eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec45",
"command": "stop"
}
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
producer = KafkaProducer(bootstrap_servers=['192.168.11.242:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
result = future.get(timeout=10)

+ 0
- 0
test/while/__init__.py View File


+ 9
- 0
test/while/test.py View File

@@ -0,0 +1,9 @@

def aa():
while True:

print("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
return True


aa()

+ 15
- 20
util/Cv2Utils.py View File

@@ -336,33 +336,26 @@ class Cv2Util():
self.build_p()
try:
await self.push_stream_write(frame)
return True
except Exception as ex:
logger.exception("推流进管道异常:{}, requestId: {}", ex, self.requestId)
current_retry_num = 0
while True:
try:
if self.p_push_retry_num > 20:
logger.info("推流失败重试次数过多, 请检查相关配置信息, 当前重试次数: {}, requestId: {}",
self.p_push_retry_num, self.requestId)
current_retry_num = 4
break
self.p_push_retry_num += 1
time.sleep(10)
current_retry_num += 1
if current_retry_num > 3 or self.p_push_retry_num > 10:
return False
time.sleep(1)
self.build_p()
await self.push_stream_write(frame)
logger.info("构建p管道重试成功, 当前重试次数: {}, requestId: {}", current_retry_num,
self.requestId)
break
return True
except Exception as e:
current_retry_num += 1
logger.exception("构建p管道异常:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
current_retry_num, self.requestId)
if current_retry_num > 3:
logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.requestId)
break
if current_retry_num > 3:
raise ServiceException(ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[0],
ExceptionType.PUSH_STREAM_URL_E_EXCEPTION.value[1])
return False

async def video_frame_write(self, or_frame, ai_frame):
if or_frame is not None:
@@ -377,25 +370,27 @@ class Cv2Util():
await self.video_frame_write(or_frame, None)
if ai_frame is not None and len(ai_frame) > 0:
await self.video_frame_write(None, ai_frame)
return True
except Exception as ex:
ai_retry_num = 0
while True:
try:
time.sleep(1)
ai_retry_num += 1
if ai_retry_num > 3:
logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId)
return False
if or_frame is not None and len(or_frame) > 0:
await self.or_video_file.write(or_frame)
if ai_frame is not None and len(ai_frame) > 0:
await self.ai_video_file.write(ai_frame)
logger.info("重新写入离线分析后视频到本地, 当前重试次数: {}, requestId: {}", ai_retry_num,
self.requestId)
break
return True
except Exception as e:
ai_retry_num += 1
logger.exception("重新写入离线分析后视频到本地:{}, 开始重试, 当前重试次数:{}, requestId: {}", e,
ai_retry_num, self.requestId)
if ai_retry_num > 3:
logger.exception("重新写入离线分析后视频到本地,重试失败:{}, requestId: {}", e, self.requestId)
raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1])


def build_write(self):
try:

+ 8
- 4
util/ImageUtils.py View File

@@ -135,18 +135,22 @@ class PictureWaterMark:

def common_water_1(self, image, logo, alpha=1):
h, w = image.shape[0], image.shape[1]
if w >= h:
rate = int(w * 0.1) / logo.shape[1]
else:
rate = int(h * 0.1) / logo.shape[0]
# if w >= h:
rate = int(w * 0.1) / logo.shape[1]
# else:
# rate = int(h * 0.1) / logo.shape[0]
mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST)
mask_h, mask_w = mask.shape[0], mask.shape[1]
mask_channels = cv2.split(mask)
dst_channels = cv2.split(image)
# b, g, r, a = cv2.split(mask)
# 计算mask在图片的坐标
# if w >= h:
ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w))
dr_points = (int(h * 0.95), int(w - h * 0.05))
# else:
# ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w))
# dr_points = (int(h * 0.95), int(w - h * 0.05))
for i in range(3):
dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][
ul_points[0]: dr_points[0],

Loading…
Cancel
Save