Browse Source

dsp支持图片分析

develop_back
chenyukun 1 year ago
parent
commit
c98ed90840
6 changed files with 257 additions and 146 deletions
  1. +10
    -3
      .idea/deployment.xml
  2. +45
    -36
      .idea/workspace.xml
  3. +102
    -85
      concurrency/IntelligentRecognitionProcess.py
  4. +16
    -8
      test/ffmpeg11/ffmpeg11.py
  5. +53
    -6
      test/kafka/producer_start.py
  6. +31
    -8
      util/Cv2Utils.py

+ 10
- 3
.idea/deployment.xml View File

<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="PublishConfigData" autoUpload="Always" serverName="thsw@192.168.10.11:22" remoteFilesAllowedToDisappearOnAutoupload="false">
<component name="PublishConfigData" autoUpload="Always" serverName="thsw@212.129.223.66:6000" remoteFilesAllowedToDisappearOnAutoupload="false">
<serverData> <serverData>
<paths name="10.11"> <paths name="10.11">
<serverdata> <serverdata>
<paths name="66:6000"> <paths name="66:6000">
<serverdata> <serverdata>
<mappings> <mappings>
<mapping deploy="/home/thsw/tuo_heng/prod/algSch" local="$PROJECT_DIR$" web="/" />
<mapping deploy="/home/thsw/chenyukun/algSch" local="$PROJECT_DIR$" web="/" />
</mappings> </mappings>
</serverdata> </serverdata>
</paths> </paths>
<paths name="dell@192.168.10.12:22"> <paths name="dell@192.168.10.12:22">
<serverdata> <serverdata>
<mappings> <mappings>
<mapping deploy="/opt/tuo_heng/algSch" local="$PROJECT_DIR$" web="/" />
<mapping deploy="/home/chenyukun/algSch" local="$PROJECT_DIR$" web="/" />
</mappings> </mappings>
</serverdata> </serverdata>
</paths> </paths>
</mappings> </mappings>
</serverdata> </serverdata>
</paths> </paths>
<paths name="thsw@212.129.223.66:6000">
<serverdata>
<mappings>
<mapping deploy="/home/thsw/chenyukun/algSch" local="$PROJECT_DIR$" />
</mappings>
</serverdata>
</paths>
</serverData> </serverData>
<option name="myAutoUpload" value="ALWAYS" /> <option name="myAutoUpload" value="ALWAYS" />
</component> </component>

+ 45
- 36
.idea/workspace.xml View File

</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="4f7dccd9-8f92-4a6e-90cc-33890d102263" name="Changes" comment="Changes"> <list default="true" id="4f7dccd9-8f92-4a6e-90cc-33890d102263" name="Changes" comment="Changes">
<change beforePath="$PROJECT_DIR$/.idea/deployment.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/deployment.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" afterDir="false" /> <change beforePath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" beforeDir="false" afterPath="$PROJECT_DIR$/concurrency/IntelligentRecognitionProcess.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/ffmpeg11/ffmpeg11.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/ffmpeg11/ffmpeg11.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/test/kafka/producer_start.py" beforeDir="false" afterPath="$PROJECT_DIR$/test/kafka/producer_start.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/util/Cv2Utils.py" beforeDir="false" afterPath="$PROJECT_DIR$/util/Cv2Utils.py" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
<recent name="D:\work\alg\tuoheng_alg\test\字典" /> <recent name="D:\work\alg\tuoheng_alg\test\字典" />
</key> </key>
</component> </component>
<component name="RunManager" selected="Python.producer_start">
<configuration name="协程" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<component name="RunManager" selected="Python.producer_start1">
<configuration name="字典" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" /> <option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/协程" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/字典" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/协程/协程.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/字典/字典.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="协程1" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="协程2" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/协程" />
<option name="SDK_HOME" value="sftp://dell@192.168.10.12:22/home/dell/anaconda3/envs/prod/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/opt/tuo_heng/algSch/test/协程/" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/协程/协程1.py" />
<option name="SCRIPT_NAME" value="/opt/tuo_heng/algSch/test/协程/协程2.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="协程2" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="协程3" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="sftp://dell@192.168.10.12:22/home/dell/anaconda3/envs/prod/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/opt/tuo_heng/algSch/test/协程/" />
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/协程" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="/opt/tuo_heng/algSch/test/协程/协程2.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/协程/协程3.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="协程3" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="装饰器" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" /> <option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/协程" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/装饰器" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/协程/协程3.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/装饰器/装饰器.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="字典" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="ffmpeg11" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" /> <option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/字典" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/ffmpeg11" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/字典/字典.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/ffmpeg11/ffmpeg11.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="装饰器" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="mysqltest" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="$PROJECT_DIR$/../../../software/anaconda/envs/test/python.exe" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test/装饰器" />
<option name="IS_MODULE_SDK" value="false" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/装饰器/装饰器.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/mysqltest.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="mysqltest" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<configuration name="producer_start" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="IS_MODULE_SDK" value="true" />
<option name="SDK_HOME" value="sftp://thsw@192.168.10.11:22/home/thsw/anaconda3/envs/chenyukun/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/home/thsw/chenyukun/algSch/test/kafka" />
<option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" /> <option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" /> <EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/test/mysqltest.py" />
<option name="SCRIPT_NAME" value="/home/thsw/chenyukun/algSch/test/kafka/producer_start.py" />
<option name="PARAMETERS" value="" /> <option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" /> <option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" /> <option name="EMULATE_TERMINAL" value="false" />
<option name="INPUT_FILE" value="" /> <option name="INPUT_FILE" value="" />
<method v="2" /> <method v="2" />
</configuration> </configuration>
<configuration name="producer_start" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="producer_start1" type="PythonConfigurationType" factoryName="Python" temporary="true">
<module name="tuoheng_alg" /> <module name="tuoheng_alg" />
<option name="INTERPRETER_OPTIONS" value="" /> <option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" /> <option name="PARENT_ENVS" value="true" />
<envs> <envs>
<env name="PYTHONUNBUFFERED" value="1" /> <env name="PYTHONUNBUFFERED" value="1" />
</envs> </envs>
<option name="SDK_HOME" value="sftp://thsw@192.168.10.11:22/home/thsw/anaconda3/envs/chenyukun/bin/python3.8" />
<option name="SDK_HOME" value="sftp://thsw@212.129.223.66:6000/home/thsw/anaconda3/envs/chenyukun/bin/python3.8" />
<option name="WORKING_DIRECTORY" value="/home/thsw/chenyukun/algSch/test/kafka" /> <option name="WORKING_DIRECTORY" value="/home/thsw/chenyukun/algSch/test/kafka" />
<option name="IS_MODULE_SDK" value="false" /> <option name="IS_MODULE_SDK" value="false" />
<option name="ADD_CONTENT_ROOTS" value="true" /> <option name="ADD_CONTENT_ROOTS" value="true" />
</configuration> </configuration>
<list> <list>
<item itemvalue="Python.mysqltest" /> <item itemvalue="Python.mysqltest" />
<item itemvalue="Python.协程" />
<item itemvalue="Python.协程1" />
<item itemvalue="Python.producer_start" />
<item itemvalue="Python.字典" />
<item itemvalue="Python.协程2" /> <item itemvalue="Python.协程2" />
<item itemvalue="Python.协程3" /> <item itemvalue="Python.协程3" />
<item itemvalue="Python.producer_start" />
<item itemvalue="Python.ffmpeg11" />
<item itemvalue="Python.producer_start1" />
</list> </list>
<recent_temporary> <recent_temporary>
<list> <list>
<item itemvalue="Python.producer_start1" />
<item itemvalue="Python.ffmpeg11" />
<item itemvalue="Python.producer_start" /> <item itemvalue="Python.producer_start" />
<item itemvalue="Python.协程3" /> <item itemvalue="Python.协程3" />
<item itemvalue="Python.协程2" /> <item itemvalue="Python.协程2" />
<item itemvalue="Python.协程1" />
<item itemvalue="Python.协程" />
</list> </list>
</recent_temporary> </recent_temporary>
</component> </component>
<workItem from="1668038178569" duration="29273000" /> <workItem from="1668038178569" duration="29273000" />
<workItem from="1668132063668" duration="26016000" /> <workItem from="1668132063668" duration="26016000" />
<workItem from="1668211745874" duration="22074000" /> <workItem from="1668211745874" duration="22074000" />
<workItem from="1668386981201" duration="683000" />
<workItem from="1668386981201" duration="40563000" />
<workItem from="1668470975347" duration="2790000" />
</task> </task>
<servers /> <servers />
</component> </component>
<SUITE FILE_PATH="coverage/tuoheng_alg$.coverage" NAME="字典 覆盖结果" MODIFIED="1668089121018" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/字典" /> <SUITE FILE_PATH="coverage/tuoheng_alg$.coverage" NAME="字典 覆盖结果" MODIFIED="1668089121018" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/字典" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$producer_start.coverage" NAME="producer_start 覆盖结果" MODIFIED="1668236333176" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch/test/kafka" /> <SUITE FILE_PATH="coverage/tuoheng_alg___$producer_start.coverage" NAME="producer_start 覆盖结果" MODIFIED="1668236333176" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch/test/kafka" />
<SUITE FILE_PATH="coverage/tuoheng_alg$dsp_master.coverage" NAME="dsp_master Coverage Results" MODIFIED="1663403978477" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" /> <SUITE FILE_PATH="coverage/tuoheng_alg$dsp_master.coverage" NAME="dsp_master Coverage Results" MODIFIED="1663403978477" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$ffmpeg11.coverage" NAME="ffmpeg11 覆盖结果" MODIFIED="1668410004435" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/ffmpeg11" />
<SUITE FILE_PATH="coverage/tuoheng_alg$5.coverage" NAME="视频添加图片水印5 Coverage Results" MODIFIED="1661905982885" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" /> <SUITE FILE_PATH="coverage/tuoheng_alg$5.coverage" NAME="视频添加图片水印5 Coverage Results" MODIFIED="1661905982885" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$KafkaUtils.coverage" NAME="KafkaUtils Coverage Results" MODIFIED="1663465345491" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" /> <SUITE FILE_PATH="coverage/tuoheng_alg$KafkaUtils.coverage" NAME="KafkaUtils Coverage Results" MODIFIED="1663465345491" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$3.coverage" NAME="协程3 覆盖结果" MODIFIED="1668147029048" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/协程" /> <SUITE FILE_PATH="coverage/tuoheng_alg___$3.coverage" NAME="协程3 覆盖结果" MODIFIED="1668147029048" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test/协程" />
<SUITE FILE_PATH="coverage/tuoheng_alg$2.coverage" NAME="协程2 覆盖结果" MODIFIED="1668066168428" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/opt/tuo_heng/algSch/test/协程/" /> <SUITE FILE_PATH="coverage/tuoheng_alg$2.coverage" NAME="协程2 覆盖结果" MODIFIED="1668066168428" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/opt/tuo_heng/algSch/test/协程/" />
<SUITE FILE_PATH="coverage/tuoheng_alg$4.coverage" NAME="视频添加图片水印4 Coverage Results" MODIFIED="1661874731395" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" /> <SUITE FILE_PATH="coverage/tuoheng_alg$4.coverage" NAME="视频添加图片水印4 Coverage Results" MODIFIED="1661874731395" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
<SUITE FILE_PATH="coverage/tuoheng_alg$TimeUtils.coverage" NAME="TimeUtils Coverage Results" MODIFIED="1661222768678" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" /> <SUITE FILE_PATH="coverage/tuoheng_alg$TimeUtils.coverage" NAME="TimeUtils Coverage Results" MODIFIED="1661222768678" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/util" />
<SUITE FILE_PATH="coverage/tuoheng_alg___$producer_start1.coverage" NAME="producer_start1 覆盖结果" MODIFIED="1668437822632" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="/home/thsw/chenyukun/algSch/test/kafka" />
<SUITE FILE_PATH="coverage/tuoheng_alg$3.coverage" NAME="视频添加文字水印3 Coverage Results" MODIFIED="1661906152928" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" /> <SUITE FILE_PATH="coverage/tuoheng_alg$3.coverage" NAME="视频添加文字水印3 Coverage Results" MODIFIED="1661906152928" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test" />
</component> </component>
</project> </project>

+ 102
- 85
concurrency/IntelligentRecognitionProcess.py View File

self.fbQueue = cfg.get("fbQueue") self.fbQueue = cfg.get("fbQueue")
self.eventQueue = Queue() self.eventQueue = Queue()
self.imageQueue = Queue() self.imageQueue = Queue()
self.pullQueue = Queue(150)
self.pullQueue = Queue(120)
self.content = cfg.get("content") self.content = cfg.get("content")
self.msg = cfg.get("msg") self.msg = cfg.get("msg")
self.gpu_ids = cfg.get("gpu_ids") self.gpu_ids = cfg.get("gpu_ids")


# 获取下一个事件 # 获取下一个事件
def getEvent(self): def getEvent(self):
eBody = None
try: try:
eBody = self.eventQueue.get(block=False) eBody = self.eventQueue.get(block=False)
return eBody return eBody
except Exception as e: except Exception as e:
pass pass
return eBody


def getPullQueue(self): def getPullQueue(self):
eBody = None
try: try:
eBody = self.pullQueue.get(block=False) eBody = self.pullQueue.get(block=False)
return eBody return eBody
except Exception as e: except Exception as e:
pass pass
return eBody


# 推送执行结果 # 推送执行结果
def sendResult(self, result): def sendResult(self, result):
''' '''
实时任务进程 实时任务进程
''' '''


def process(frame): def process(frame):
try: try:
p_result, timeOut = frame[1].process(copy.deepcopy(frame[0].get("frame")), int(frame[0].get("width") / 2)) p_result, timeOut = frame[1].process(copy.deepcopy(frame[0].get("frame")), int(frame[0].get("width") / 2))
self.msg.get("request_id"), ".mp4") self.msg.get("request_id"), ".mp4")
self.pull_stream_timeout = int(self.content["service"]["cv2_pull_stream_timeout"]) self.pull_stream_timeout = int(self.content["service"]["cv2_pull_stream_timeout"])





# 停止任务方法 # 停止任务方法
def stop_task(self, cv2tool, pullProcess, snalysisStatus): def stop_task(self, cv2tool, pullProcess, snalysisStatus):
# 停止cv2相关配置 # 停止cv2相关配置
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60 * 3) pullProcess.join(60 * 3)
hb.sendHbQueue({"command": "stop"}) hb.sendHbQueue({"command": "stop"})
hb.join(60*3)
hb.join(60 * 3)
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), snalysisStatus, self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), snalysisStatus,
AnalysisType.ONLINE.value, AnalysisType.ONLINE.value,
progress=Constant.success_progess, progress=Constant.success_progess,
self.sendhbMessage(AnalysisStatus.WAITING.value, "0.0000", AnalysisType.ONLINE.value) self.sendhbMessage(AnalysisStatus.WAITING.value, "0.0000", AnalysisType.ONLINE.value)
mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"])) mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"]))
# 结果反馈进程启动 # 结果反馈进程启动
pullProcess = OnlinePullVideoStreamProcess(self.msg, self.content, self.pullQueue, self.fbQueue, self.imageQueue)
pullProcess = OnlinePullVideoStreamProcess(self.msg, self.content, self.pullQueue, self.fbQueue,
self.imageQueue)
pullProcess.daemon = True pullProcess.daemon = True
pullProcess.start() pullProcess.start()
cv2tool = Cv2Util(None, self.msg.get('push_url'), self.orFilePath, self.aiFilePath, cv2tool = Cv2Util(None, self.msg.get('push_url'), self.orFilePath, self.aiFilePath,
self.msg.get("request_id")) self.msg.get("request_id"))

high_score_image = {} high_score_image = {}
self.sendhbMessage(AnalysisStatus.RUNNING.value, "0.0000", AnalysisType.ONLINE.value)
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
pullProcess_timeout = None pullProcess_timeout = None
with ThreadPoolExecutor(max_workers=10) as t:
task_frame = None
self.sendhbMessage(AnalysisStatus.RUNNING.value, "0.0000", AnalysisType.ONLINE.value)
with ThreadPoolExecutor(max_workers=6) as t:
while True: while True:
if not pullProcess.is_alive(): if not pullProcess.is_alive():
if pullProcess_timeout is None: if pullProcess_timeout is None:
if 'stop' == cmdStr: if 'stop' == cmdStr:
logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id")) logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_pull_stream"}) pullProcess.sendCommand({"command": "stop_pull_stream"})
if self.pullQueue.qsize() == 0:
logger.info("重试, 拉流队列大小: {},requestId: {}", self.pullQueue.qsize(), self.msg.get("request_id"))
time.sleep(0.1)
continue
frames = [] frames = []
status = None status = None
for i in range(self.pullQueue.qsize()):
frame_result = self.getPullQueue()
if frame_result is None:
continue
if frame_result.get("status") == '4':
if cv2tool.fps is None:
cv2tool.fps = int(frame_result.get("fps"))
cv2tool.width = int(frame_result.get("width"))
cv2tool.height = int(frame_result.get("height"))
frames.append((frame_result, mod, self.content, self.pic, cv2tool))
else:
status = frame_result
if len(frames) > 0:
if task_frame is not None:
frames, status = task_frame.result()
task_frame = t.submit(buildFrame, self.pullQueue, cv2tool, mod, self.content, self.pic)
if frames is not None and len(frames) > 0:
for result in t.map(process, frames): for result in t.map(process, frames):
if result is not None: if result is not None:
p_result, frame_all, frame_merge = result p_result, frame_all, frame_merge = result
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)}) self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60*3)
pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break break
elif status.get("status") == "9": elif status.get("status") == "9":
self.imageQueue.put({"image": high_score_image.pop(key)}) self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id")) logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60*3)
pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break break
logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id")) logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id"))
cv2tool.close() cv2tool.close()
if loop: if loop:
loop.close() loop.close()
if pullProcess.is_alive():
if pullProcess is not None and pullProcess.is_alive():
pullProcess.sendCommand({"command": "stop_ex"}) pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3) pullProcess.join(60 * 3)
if feedback: if feedback:
logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", self.aiFilePath, self.msg.get("request_id")) logger.info("删除AI视频成功, aiFilePath: {}, requestId: {}", self.aiFilePath, self.msg.get("request_id"))




def getPullResultQueue(pullQueue):
eBody = None
try:
eBody = pullQueue.get(block=False)
return eBody
except Exception as e:
pass
return eBody


def buildFrame(pullQueue, cv2tool, mod, content, pic):
frames = []
status = None
for i in range(pullQueue.qsize()):
frame_result = getPullResultQueue(pullQueue)
if frame_result is None:
time.sleep(0.01)
continue
if frame_result.get("status") == '4':
cv2tool.getFrameConfig(int(frame_result.get("fps")), int(frame_result.get("width")),
int(frame_result.get("height")))
frames.append((frame_result, mod, content, pic, cv2tool))
else:
status = frame_result
return frames, status


class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):


def __init__(self, cfg): def __init__(self, cfg):
sign_url=ai_play_url, sign_url=ai_play_url,
analyse_time=TimeUtils.now_date_to_str())}) analyse_time=TimeUtils.now_date_to_str())})



def run(self): def run(self):
cv2tool = None cv2tool = None
pullProcess = None pullProcess = None
LogUtils.init_log(self.content) LogUtils.init_log(self.content)
self.sendhbMessage(AnalysisStatus.WAITING.value, "0.0000", AnalysisType.OFFLINE.value) self.sendhbMessage(AnalysisStatus.WAITING.value, "0.0000", AnalysisType.OFFLINE.value)
mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"])) mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"]))
pullProcess = OfflinePullVideoStreamProcess(self.msg, self.content, self.pullQueue, self.fbQueue, self.imageQueue)
pullProcess = OfflinePullVideoStreamProcess(self.msg, self.content, self.pullQueue, self.fbQueue,
self.imageQueue)
pullProcess.daemon = True pullProcess.daemon = True
pullProcess.start() pullProcess.start()
cv2tool = Cv2Util(None, self.msg.get('push_url'), aiFilePath=self.aiFilePath, requestId=self.msg.get("request_id"))
cv2tool = Cv2Util(None, self.msg.get('push_url'), aiFilePath=self.aiFilePath,
requestId=self.msg.get("request_id"))

high_score_image = {} high_score_image = {}
# 当前帧数 # 当前帧数
self.sendhbMessage(AnalysisStatus.RUNNING.value, "", AnalysisType.OFFLINE.value)
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
pullProcess_timeout = None pullProcess_timeout = None
with ThreadPoolExecutor(max_workers=10) as t:
self.sendhbMessage(AnalysisStatus.RUNNING.value, "", AnalysisType.OFFLINE.value)
task_frame = None
with ThreadPoolExecutor(max_workers=6) as t:
while True: while True:
start = time.time()
if not pullProcess.is_alive(): if not pullProcess.is_alive():
if pullProcess_timeout is None: if pullProcess_timeout is None:
pullProcess_timeout = time.time() pullProcess_timeout = time.time()
if time.time() - pullProcess_timeout > 300: if time.time() - pullProcess_timeout > 300:
logger.info("拉流进程停止异常, requestId: {}", self.msg.get("request_id")) logger.info("拉流进程停止异常, requestId: {}", self.msg.get("request_id"))
raise Exception("拉流进程异常停止") raise Exception("拉流进程异常停止")
# 检查是否获取到视频信息
# 检查是否获取到视频信息
eBody = self.getEvent() eBody = self.getEvent()
if eBody is not None and len(eBody) > 0: if eBody is not None and len(eBody) > 0:
cmdStr = eBody.get("command") cmdStr = eBody.get("command")
if 'stop' == cmdStr: if 'stop' == cmdStr:
logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id")) logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_pull_stream"}) pullProcess.sendCommand({"command": "stop_pull_stream"})
if self.pullQueue.qsize() == 0:
# logger.info("重试,拉流队列大小: {}, requestId: {}", self.pullQueue.qsize(), self.msg.get("request_id"))
time.sleep(0.1)
continue
frames = [] frames = []
status = None status = None
start = time.time()
logger.info("拉流队列大小: {}, requestId: {}", self.pullQueue.qsize(), self.msg.get("request_id"))
for i in range(self.pullQueue.qsize()):
frame_result = self.getPullQueue()
if frame_result is None:
continue
if frame_result.get("status") == '4':
if cv2tool.fps is None:
cv2tool.fps = int(frame_result.get("fps"))
cv2tool.width = int(frame_result.get("width"))
cv2tool.height = int(frame_result.get("height"))
frames.append((frame_result, mod, self.content, self.pic, cv2tool))
else:
status = frame_result
logger.info("执行时间: {}, requestId: {}", time.time() - start, self.msg.get("request_id"))
start_1 = time.time()
if task_frame is not None:
frames, status = task_frame.result()
task_frame = t.submit(buildFrame, self.pullQueue, cv2tool, mod, self.content, self.pic)
logger.info("帧数:{}, requestId: {}", len(frames), self.msg.get("request_id"))
if len(frames) > 0: if len(frames) > 0:
for result in t.map(process, frames): for result in t.map(process, frames):
if result is not None: if result is not None:
task = loop.create_task(cv2tool.push_stream(frame_merge)) task = loop.create_task(cv2tool.push_stream(frame_merge))
task1 = loop.create_task(cv2tool.video_write(None, frame_merge)) task1 = loop.create_task(cv2tool.video_write(None, frame_merge))
loop.run_until_complete(asyncio.wait([task, task1])) loop.run_until_complete(asyncio.wait([task, task1]))
if frame_all[0].get("cct_frame") % 400 == 0:
task_process = str(format(float(frame_all[0].get("cct_frame")) / float(frame_all[0].get("all_frame")), '.4f'))
self.sendhbMessage(AnalysisStatus.RUNNING.value, task_process, AnalysisType.OFFLINE.value)
if frame_all[0].get("cct_frame") % 600 == 0:
task_process = str(format(
float(frame_all[0].get("cct_frame")) / float(frame_all[0].get("all_frame")),
'.4f'))
self.sendhbMessage(AnalysisStatus.RUNNING.value, task_process,
AnalysisType.OFFLINE.value)
if p_result[2] is not None and len(p_result[2]) > 0: if p_result[2] is not None and len(p_result[2]) > 0:
for ai_analyse_result in p_result[2]: for ai_analyse_result in p_result[2]:
order = str(int(ai_analyse_result[0])) order = str(int(ai_analyse_result[0]))
pullProcess.join(60 * 3) pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break break
logger.info("执行时间1111: {}, requestId: {}", time.time() - start_1, self.msg.get("request_id"))

logger.info("执行时间1111: {}, 队列大小:{}, requestId: {}", time.time() - start, self.pullQueue.qsize(), self.msg.get("request_id"))
if status is None: if status is None:
continue continue
if status.get("status") == "1": if status.get("status") == "1":
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)}) self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60*3)
pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break break
elif status.get("status") == "3": elif status.get("status") == "3":
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image.pop(key)}) self.imageQueue.put({"image": high_score_image.pop(key)})
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60*3)
pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.TIMEOUT.value)
break break
elif status.get("status") == "9": elif status.get("status") == "9":
self.imageQueue.put({"image": high_score_image.pop(key)}) self.imageQueue.put({"image": high_score_image.pop(key)})
logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id")) logger.info("实时任务正常结束:requestId: {}", self.msg.get("request_id"))
pullProcess.sendCommand({"command": "stop_image"}) pullProcess.sendCommand({"command": "stop_image"})
pullProcess.join(60*3)
pullProcess.join(60 * 3)
self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, pullProcess, AnalysisStatus.SUCCESS.value)
break break
logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id")) logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id"))
cv2tool.close() cv2tool.close()
if loop: if loop:
loop.close() loop.close()
if pullProcess.is_alive():
if pullProcess is not None and pullProcess.is_alive():
pullProcess.sendCommand({"command": "stop_ex"}) pullProcess.sendCommand({"command": "stop_ex"})
pullProcess.join(60 * 3) pullProcess.join(60 * 3)
if feedback is not None: if feedback is not None:
random_num, random_num,
'image', 'image',
msg.get('request_id'), "AI") msg.get('request_id'), "AI")
loop.run_until_complete(asyncio.wait([upload_file(aliyunOssSdk, or_image_name, or_image),
upload_file(aliyunOssSdk, ai_image_name, ai_image)]))
if p_result[2] is not None and len(p_result[2]) > 0: if p_result[2] is not None and len(p_result[2]) > 0:
loop.run_until_complete(asyncio.wait([upload_file(aliyunOssSdk, or_image_name, or_image),
upload_file(aliyunOssSdk, ai_image_name, ai_image)]))
for ai_analyse_result in p_result[2]: for ai_analyse_result in p_result[2]:
order = str(int(ai_analyse_result[0])) order = str(int(ai_analyse_result[0]))
conf_c = ai_analyse_result[5] conf_c = ai_analyse_result[5]
model_type_code, model_type_code,
order, order,
TimeUtils.now_date_to_str())}) TimeUtils.now_date_to_str())})
return True
else:
fbQueue.put({"feedback": FeedBack.message_feedback(msg.get('request_id'),
AnalysisStatus.RUNNING.value,
AnalysisType.IMAGE.value, "", "",
'',
or_image_name,
ai_image_name,
model_type_code,
'None',
TimeUtils.now_date_to_str())})
return True
# else:
# fbQueue.put({"feedback": FeedBack.message_feedback(msg.get('request_id'),
# AnalysisStatus.RUNNING.value,
# AnalysisType.IMAGE.value, "", "",
# '',
# or_image_name,
# ai_image_name,
# model_type_code,
# 'None',
# TimeUtils.now_date_to_str())})
except Exception as e: except Exception as e:
logger.exception("模型分析异常: {}, requestId: {}", e, msg.get("request_id")) logger.exception("模型分析异常: {}, requestId: {}", e, msg.get("request_id"))
return False return False
# 加载模型 # 加载模型
mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"])) mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"]))
imageUrls = self.msg.get("image_urls") imageUrls = self.msg.get("image_urls")
result = True
with ThreadPoolExecutor(max_workers=5) as t: with ThreadPoolExecutor(max_workers=5) as t:
obj_list = [] obj_list = []
for imageUrl in imageUrls: for imageUrl in imageUrls:
for future in as_completed(obj_list): for future in as_completed(obj_list):
data = future.result() data = future.result()
if not data: if not data:
self.sendResult({
"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
AnalysisType.IMAGE.value,
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
analyse_time=TimeUtils.now_date_to_str())})
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.SUCCESS.value,
AnalysisType.IMAGE.value,
progress=Constant.success_progess,
analyse_time=TimeUtils.now_date_to_str())})
result = False
if result:
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.SUCCESS.value,
AnalysisType.IMAGE.value,
progress=Constant.success_progess,
analyse_time=TimeUtils.now_date_to_str())})
else:
self.sendResult({"feedback": message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value,
AnalysisType.IMAGE.value,
ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
ExceptionType.SERVICE_INNER_EXCEPTION.value[1],
analyse_time=TimeUtils.now_date_to_str())})
logger.info("图片进程任务完成,requestId:{}", self.msg.get("request_id")) logger.info("图片进程任务完成,requestId:{}", self.msg.get("request_id"))
except ServiceException as s: except ServiceException as s:
logger.error("图片分析异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, self.msg.get("request_id")) logger.error("图片分析异常,异常编号:{}, 异常描述:{}, requestId:{}", s.code, s.msg, self.msg.get("request_id"))

+ 16
- 8
test/ffmpeg11/ffmpeg11.py View File

import time import time
import subprocess as sp import subprocess as sp
import ffmpeg import ffmpeg
import numpy
import cv2 import cv2
import sys import sys
import random import random




if __name__ == '__main__': if __name__ == '__main__':
file_path = 'rtmp://live.play.t-aaron.com/live/THSAk'
file_path = 'https://vod.play.t-aaron.com/0bc905ef5651439da2bfba8427fe467e/a76a7ebb6e3b44ef9c0c7820c7e9c574-f2d7ee90cba11aa91971d58e06d295d2-4k.mp4'
#file_path = 'https://vod.play.t-aaron.com/customerTrans/edc96ea2115a0723a003730956208134/40b416f7-183b57f6be0-0004-f90c-f2c-7ec68.mp4' #file_path = 'https://vod.play.t-aaron.com/customerTrans/edc96ea2115a0723a003730956208134/40b416f7-183b57f6be0-0004-f90c-f2c-7ec68.mp4'
#file_path = 'https://vod.play.t-aaron.com/3301fc8e166f45be88f2214e7a8f4a9d/e29535365b54434d9ed2e8c3b0a175da-fba35541b31a1049ca05b145a283c33a-hd.mp4' #file_path = 'https://vod.play.t-aaron.com/3301fc8e166f45be88f2214e7a8f4a9d/e29535365b54434d9ed2e8c3b0a175da-fba35541b31a1049ca05b145a283c33a-hd.mp4'
video_info = get_video_info(file_path) video_info = get_video_info(file_path)
height = int(video_info['height']) height = int(video_info['height'])
command = ['ffmpeg', command = ['ffmpeg',
'-vcodec', 'h264_cuvid', '-vcodec', 'h264_cuvid',
'-resize', '1920x1080',
# '-hwaccel_output_format', 'bgr24',
'-i', file_path, '-i', file_path,
# '-vf', "hwdownload,format=bgr24",
# "-vcodec", "h264_nvenc", # hevc_nvenc h264_nvenc
'-f', 'rawvideo', '-f', 'rawvideo',
'-g', '5',
# '-g', '5',
# '-pix_fmt', 'bgr24', # '-pix_fmt', 'bgr24',
'-an',
# '-hwaccel_output_format', 'bgr24',
# '-an',
'-'] '-']
p = sp.Popen(command, stdout=sp.PIPE) p = sp.Popen(command, stdout=sp.PIPE)
# ai_video_file = cv2.VideoWriter(r"C:\Users\chenyukun\Desktop\shipin\aa.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 30, # ai_video_file = cv2.VideoWriter(r"C:\Users\chenyukun\Desktop\shipin\aa.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 30,
start1 = time.time() start1 = time.time()
while True: while True:
start = time.time() start = time.time()
in_bytes = p.stdout.read(int(width * height * 3))
# in_bytes = p.stdout.read()
in_bytes = p.stdout.read(int(width * height * 3 // 8))
# in_bytes = p.stdout.read(int(width * height * 3/4))
if not in_bytes: if not in_bytes:
print(in_bytes) print(in_bytes)
# ai_video_file.release() # ai_video_file.release()
p.wait() p.wait()
break break
# 转成ndarray # 转成ndarray
in_frame = (np.frombuffer(in_bytes, np.uint8).reshape([int(height), int(width), 3]))
img = (np.frombuffer(in_bytes, np.uint8)).reshape((int(height/2 * 3 // 2), int(width/2)))
bgr_img = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
# in_frame = (np.frombuffer(in_bytes, np.uint8).reshape([int(height/2), int(width/2), 3]))
# print("拉流时间:", time.time() - start) # print("拉流时间:", time.time() - start)
# frame = cv2.resize(in_frame, (1280, 720)) # 改变图片尺寸 # frame = cv2.resize(in_frame, (1280, 720)) # 改变图片尺寸
# frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR) # 转成BGR
# i += 1 # i += 1
# print(round(time.time()-start, 5)) # print(round(time.time()-start, 5))
# #
# p.stdout.close() # p.stdout.close()
# p.wait() # p.wait()
# break # break
# cv2.imshow('frame', frame)
# cv2.imshow('frame', bgr_img)
# cv2.waitKey(1)
# time.sleep(1111) # time.sleep(1111)
p.kill() p.kill()

+ 53
- 6
test/kafka/producer_start.py View File

# result = future.get(timeout=10) # result = future.get(timeout=10)
# print(result) # print(result)


topicName = 'dsp-alg-image-tasks'
# topicName = 'dsp-alg-image-tasks'
# eBody = {
# "request_id": "d4c909912ac741ce81ccef03fd1b2ec46",
# "models": [
# {
# "code": "001",
# "categories": [
# {
# "id": "0",
# "config": {}
# },
# {
# "id": "1",
# "config": {}
# },
# {
# "id": "2",
# "config": {}
# },
# {
# "id": "3",
# "config": {}
# },
# {
# "id": "4",
# "config": {}
# },
# {
# "id": "5",
# "config": {}
# },
# {
# "id": "6",
# "config": {}
# },
# {
# "id": "7",
# "config": {}
# }
# ]
# }],
# "command": "start",
# "image_urls": ["https://image.t-aaron.com/P20221112103326614/2022-11-12-10-37-02_frame-3991-4291_20221112103702452021-offline-P20221112103326614-eb4467a3fe8f405ebf4c44f1b48a7e4b_OR.jpg",
# "https://image.t-aaron.com/P20221112103326614/2022-11-12-10-35-09_frame-1785-2085_20221112103509952824-offline-P20221112103326614-eb4467a3fe8f405ebf4c44f1b48a7e4b_OR.jpg"],
# "results_base_dir": "P20220802133841159"
# }
topicName = 'dsp-alg-offline-tasks'
eBody = { eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec46", "request_id": "d4c909912ac741ce81ccef03fd1b2ec46",
"models": [ "models": [
"code": "001", "code": "001",
"categories": [ "categories": [
{ {
"id": "0",
"config": {}
"id": "0",
"config": {}
}, },
{ {
"id": "1", "id": "1",
] ]
}], }],
"command": "start", "command": "start",
"image_urls": ["https://image.t-aaron.com/P20221112103326614/2022-11-12-10-37-02_frame-3991-4291_20221112103702452021-offline-P20221112103326614-eb4467a3fe8f405ebf4c44f1b48a7e4b_OR.jpg",
"https://image.t-aaron.com/P20221112103326614/2022-11-12-10-35-09_frame-1785-2085_20221112103509952824-offline-P20221112103326614-eb4467a3fe8f405ebf4c44f1b48a7e4b_OR.jpg"],
"original_url": "https://vod.play.t-aaron.com/0bc905ef5651439da2bfba8427fe467e/a76a7ebb6e3b44ef9c0c7820c7e9c574-f2d7ee90cba11aa91971d58e06d295d2-4k.mp4",
"original_type": ".mp4",
"push_url": "rtmp://live.push.t-aaron.com/live/THSAr",
"results_base_dir": "P20220802133841159" "results_base_dir": "P20220802133841159"
} }
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'], producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8')) value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec46', value=eBody)
result = future.get(timeout=10) result = future.get(timeout=10)
print(result) print(result)

+ 31
- 8
util/Cv2Utils.py View File

self.fps = None self.fps = None
self.width = None self.width = None
self.height = None self.height = None
self.wah= None
self.wh = None self.wh = None
self.h = None
self.hn = None
self.w = None
self.all_frames = None self.all_frames = None
self.bit_rate = None self.bit_rate = None
self.pull_p = None self.pull_p = None
self.requestId = requestId self.requestId = requestId
self.p_push_retry_num = 0 self.p_push_retry_num = 0


def getFrameConfig(self, fps, width, height):
if self.fps is None:
self.fps = fps
self.width = width
self.height = height
self.wh = int(width * height * 3 // 8)
self.wah = '%sx%s' % (int(self.width/2), int(self.height/2))
self.h = int(self.height/2 * 3 // 2)
self.w = int(self.width/2)
self.hn = int(self.height/2)

''' '''
获取视频信息 获取视频信息
''' '''
if height: if height:
self.height = int(height) self.height = int(height)
if width is not None and height is not None: if width is not None and height is not None:
self.wh = int(width * height * 3)
self.wh = int(width * height * 3 // 8)
self.wah = '%sx%s' % (int(self.width/2), int(self.height/2))
self.h = int(self.height/2 * 3 // 2)
self.w = int(self.width/2)
self.hn = int(self.height/2)
if nb_frames: if nb_frames:
self.all_frames = int(nb_frames) self.all_frames = int(nb_frames)
if fps: if fps:


def build_pull_p(self): def build_pull_p(self):
try: try:
if self.wah is None:
return
if self.pull_p: if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId) logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close() self.pull_p.stdout.close()
# # '-s', "{}x{}".format(int(width), int(height)), # # '-s', "{}x{}".format(int(width), int(height)),
# '-an', # '-an',
# '-'] # '-']
input_config = {'c:v': 'h264_cuvid'}
input_config = {'c:v': 'h264_cuvid', 'resize': self.wah}
process = ( process = (
ffmpeg ffmpeg
.input(self.pullUrl, **input_config) .input(self.pullUrl, **input_config)
.output('pipe:', format='rawvideo', pix_fmt='bgr24')
.output('pipe:', format='rawvideo') # pix_fmt='bgr24'
.overwrite_output() .overwrite_output()
.global_args('-an') .global_args('-an')
.run_async(pipe_stdout=True) .run_async(pipe_stdout=True)
# ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1]) # ExceptionType.PULL_PIPELINE_INIT_EXCEPTION.value[1])
in_bytes = self.pull_p.stdout.read(self.wh) in_bytes = self.pull_p.stdout.read(self.wh)
if in_bytes is not None and len(in_bytes) > 0: if in_bytes is not None and len(in_bytes) > 0:
result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3]))
result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
# result = (np.frombuffer(in_bytes, np.uint8).reshape([int(self.height), int(self.width), 3]))
img = (np.frombuffer(in_bytes, np.uint8)).reshape((self.h, self.w))
result = cv2.cvtColor(img, cv2.COLOR_YUV2BGR_NV12)
# result = cv2.resize(result, (int(self.width / 2), int(self.height / 2)), interpolation=cv2.INTER_LINEAR)
except ServiceException as s: except ServiceException as s:
logger.exception("读流异常: {}, requestId:{}", s, self.requestId) logger.exception("读流异常: {}, requestId:{}", s, self.requestId)
except Exception as e: except Exception as e:
'-pix_fmt', 'bgr24', '-pix_fmt', 'bgr24',
'-thread_queue_size', '16', '-thread_queue_size', '16',
# '-s', "{}x{}".format(self.width * 2, self.height), # '-s', "{}x{}".format(self.width * 2, self.height),
'-s', "{}x{}".format(int(self.width), int(self.height / 2)),
'-s', "{}x{}".format(int(self.width), int(self.hn)),
'-r', str(self.fps), '-r', str(self.fps),
'-i', '-', # 指定输入文件 '-i', '-', # 指定输入文件
'-g', str(self.fps), '-g', str(self.fps),
ExceptionType.VIDEO_CONFIG_EXCEPTION.value[1]) ExceptionType.VIDEO_CONFIG_EXCEPTION.value[1])
if self.orFilePath is not None and self.or_video_file is None: if self.orFilePath is not None and self.or_video_file is None:
self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
(int(self.width / 2), int(self.height / 2)))
(int(self.w), int(self.hn)))
if self.or_video_file is None: if self.or_video_file is None:
raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0], raise ServiceException(ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[0],
ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1]) ExceptionType.OR_WRITE_OBJECT_EXCEPTION.value[1])
if self.aiFilePath is not None and self.ai_video_file is None: if self.aiFilePath is not None and self.ai_video_file is None:
self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
(int(self.width), int(self.height / 2)))
(int(self.width), int(self.hn)))
if self.ai_video_file is None: if self.ai_video_file is None:
raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0], raise ServiceException(ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[0],
ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1]) ExceptionType.AI_WRITE_OBJECT_EXCEPTION.value[1])

Loading…
Cancel
Save