Sfoglia il codice sorgente

master代码何如develop

tags/V2.4.0
chenyukun 1 anno fa
parent
commit
f758108e91
17 ha cambiato i file con 1209 aggiunte e 267 eliminazioni
  1. +2
    -2
      concurrency/HeartbeatThread.py
  2. +100
    -97
      concurrency/IntelligentRecognitionProcess.py
  3. +45
    -7
      dsp_application.yml
  4. +2
    -11
      service/Dispatcher.py
  5. +7
    -6
      test/asnyc.py
  6. +72
    -11
      test/cv2test1.py
  7. +0
    -0
      test/ffmpeg11/__init__.py
  8. +133
    -0
      test/ffmpeg11/ffmpeg11.py
  9. +291
    -44
      test/ffmpeg2.py
  10. +297
    -0
      test/ffmpeg3.py
  11. BIN
      test/image/1.jpg
  12. BIN
      test/image/2.jpg
  13. +70
    -49
      test/producer_start.py
  14. +13
    -13
      test/余弦相似度计算.py
  15. +115
    -11
      util/Cv2Utils.py
  16. +6
    -6
      util/KafkaUtils.py
  17. +56
    -10
      util/ModelUtils.py

+ 2
- 2
concurrency/HeartbeatThread.py Vedi File

feedback = message_feedback(self.request_id, AnalysisStatus.RUNNING.value, self.mode_service, feedback = message_feedback(self.request_id, AnalysisStatus.RUNNING.value, self.mode_service,
analyse_time=TimeUtils.now_date_to_str()) analyse_time=TimeUtils.now_date_to_str())
self.sendResult({"feedback": feedback}) self.sendResult({"feedback": feedback})
num += 2
time.sleep(2)
num += 3
time.sleep(3)


logger.info("心跳线程停止完成!requestId:{}", self.request_id) logger.info("心跳线程停止完成!requestId:{}", self.request_id)

+ 100
- 97
concurrency/IntelligentRecognitionProcess.py Vedi File

import os import os
import time import time
import copy import copy

import numpy as np

from common import Constant from common import Constant
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from loguru import logger from loguru import logger




class IntelligentRecognitionProcess(Process): class IntelligentRecognitionProcess(Process):
def __init__(self, cfg, config):
def __init__(self, cfg):
super().__init__() super().__init__()
self.fbQueue = cfg.get("fbQueue") self.fbQueue = cfg.get("fbQueue")
self.eventQueue = Queue() self.eventQueue = Queue()
self.gpu_ids = cfg.get("gpu_ids") self.gpu_ids = cfg.get("gpu_ids")
self.pic = cfg.get("pic") self.pic = cfg.get("pic")
self.logo = cfg.get("logo") self.logo = cfg.get("logo")
self.config = config


# 给本进程发送事件 # 给本进程发送事件
def sendEvent(self, eBody): def sendEvent(self, eBody):
# 停止任务方法 # 停止任务方法
def stop_task(self, cv2tool, orFilePath, aiFilePath, snalysisStatus): def stop_task(self, cv2tool, orFilePath, aiFilePath, snalysisStatus):
# 停止cv2相关配置 # 停止cv2相关配置
cv2tool.stop_cv2()
cv2tool.close()
if not os.path.exists(orFilePath) or not os.path.exists(aiFilePath): if not os.path.exists(orFilePath) or not os.path.exists(aiFilePath):
logger.error("原视频或AI视频不存在!requestId:{}", self.msg.get("request_id")) logger.error("原视频或AI视频不存在!requestId:{}", self.msg.get("request_id"))
raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0], raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
LogUtils.init_log(self.content) LogUtils.init_log(self.content)
# 程序开始时间 # 程序开始时间
start_time = time.time() start_time = time.time()
mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
# mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
# mod_thread.setDaemon(True)
# mod_thread.start()
# mod = None
# model_type_code = None
# modelConfig = None

mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"]))
# 启动公共进程包含(图片上传线程,心跳线程,问题反馈线程) # 启动公共进程包含(图片上传线程,心跳线程,问题反馈线程)
commonProcess = CommonProcess(self.fbQueue, None, self.content, self.msg, self.imageQueue, commonProcess = CommonProcess(self.fbQueue, None, self.content, self.msg, self.imageQueue,
AnalysisType.ONLINE.value) AnalysisType.ONLINE.value)
".mp4") ".mp4")
cv2tool = Cv2Util(self.msg.get('pull_url'), self.msg.get('push_url'), orFilePath, aiFilePath, cv2tool = Cv2Util(self.msg.get('pull_url'), self.msg.get('push_url'), orFilePath, aiFilePath,
self.msg.get("request_id")) self.msg.get("request_id"))
cv2tool.build_cv2()
cv2tool.get_video_info()
# cv2重试初始化次数 # cv2重试初始化次数
cv2_init_num = 1 cv2_init_num = 1
# 解决开始拉流失败问题,解决初始化fps,height,weight获取不到问题 # 解决开始拉流失败问题,解决初始化fps,height,weight获取不到问题
event_command = event.get("command") event_command = event.get("command")
if 'stop' == event_command: if 'stop' == event_command:
logger.info("实时任务开始停止分析, requestId: {}", self.msg.get("request_id")) logger.info("实时任务开始停止分析, requestId: {}", self.msg.get("request_id"))
cv2tool.build_write()
self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
break break
if cv2tool.cap is None or not cv2tool.cap.isOpened():
if cv2tool.checkconfig():
pull_stream_init_timeout = time.time() - start_time pull_stream_init_timeout = time.time() - start_time
if pull_stream_init_timeout > int(self.content["service"]["cv2_pull_stream_timeout"]): if pull_stream_init_timeout > int(self.content["service"]["cv2_pull_stream_timeout"]):
logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout, logger.info("开始拉流超时, 超时时间:{}, requestId:{}", pull_stream_init_timeout,
logger.info("cv2开始拉流初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id")) logger.info("cv2开始拉流初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
cv2_init_num += 1 cv2_init_num += 1
time.sleep(1) time.sleep(1)
cv2tool.build_cv2()
cv2tool.get_video_info()
continue continue
else: else:
break break
cv2_init_num = 1 cv2_init_num = 1
cv2tool.build_write() cv2tool.build_write()
cv2tool.build_pull_p()
cv2tool.build_p()
high_score_image = {} high_score_image = {}
step = int(self.content["service"]["frame_step"]) step = int(self.content["service"]["frame_step"])
pull_start_time = None pull_start_time = None
read_start_time = None read_start_time = None
# 模型初始化次数 # 模型初始化次数
# model = 0 # model = 0
concurrent_frame = 1
while True: while True:
end_time = time.time() end_time = time.time()
create_task_time = end_time - start_time create_task_time = end_time - start_time
logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id")) logger.info("实时任务开始停止, requestId: {}", self.msg.get("request_id"))
self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.SUCCESS.value)
break break
# 检测是否断流
if cv2tool.cap is None or not cv2tool.cap.isOpened():
if pull_start_time is None:
pull_start_time = time.time()
# 默认1个小时
pull_stream_timeout = time.time() - pull_start_time
if pull_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
if high_score_image is not None and len(high_score_image) > 0:
for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image[key]})
del high_score_image[key]
logger.info("拉流超时, 超时时间:{}, requestId:{}", pull_stream_timeout, self.msg.get("request_id"))
self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
break
logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
cv2_init_num += 1
time.sleep(1)
cv2tool.build_cv2()
continue
# if cv2tool.pull_p is None:
# if pull_start_time is None:
# pull_start_time = time.time()
# # 默认1个小时
# pull_stream_timeout = time.time() - pull_start_time
# if pull_stream_timeout > int(self.content["service"]["cv2_read_stream_timeout"]):
# if high_score_image is not None and len(high_score_image) > 0:
# for key in list(high_score_image.keys()):
# self.imageQueue.put({"image": high_score_image[key]})
# del high_score_image[key]
# logger.info("拉流超时, 超时时间:{}, requestId:{}", pull_stream_timeout, self.msg.get("request_id"))
# self.stop_task(cv2tool, orFilePath, aiFilePath, AnalysisStatus.TIMEOUT.value)
# break
# logger.info("拉流初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
# cv2_init_num += 1
# time.sleep(1)
# cv2tool.build_pull_p()
# continue
pull_start_time = None pull_start_time = None
is_opened, frame = cv2tool.getCap().read()
if is_opened is None or not is_opened:
in_bytes = cv2tool.pull_p.stdout.read(cv2tool.width * cv2tool.height * 3)
if not in_bytes:
if read_start_time is None: if read_start_time is None:
read_start_time = time.time() read_start_time = time.time()
read_stream_timeout = time.time() - read_start_time read_stream_timeout = time.time() - read_start_time
time.sleep(1) time.sleep(1)
logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id")) logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
cv2_init_num += 1 cv2_init_num += 1
cv2tool.build_cv2()
cv2tool.build_pull_p()
continue continue
read_start_time = None read_start_time = None
# if mod is None and model == 0:
# model += 1
# logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
# mod, model_type_code, modelConfig = mod_thread.get_result()
frame = (np.frombuffer(in_bytes, np.uint8).reshape([cv2tool.height, cv2tool.width, 3]))
# time00 = time.time() # time00 = time.time()
# 调用AI模型 # 调用AI模型
p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
p_result, timeOut = mod.process(copy.deepcopy(frame), cv2tool.width)
# time11 = time.time() # time11 = time.time()
# if time11 - time00 > 1: # if time11 - time00 > 1:
# logger.info("算法模型调度时间:{}s, requestId:{}", int(time11-time00), self.msg.get("request_id")) # logger.info("算法模型调度时间:{}s, requestId:{}", int(time11-time00), self.msg.get("request_id"))
logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id")) logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
break break
# # 问题图片加入队列, 暂时写死,后期修改为真实问题 # # 问题图片加入队列, 暂时写死,后期修改为真实问题
cf = int(cv2tool.cap.get(1))
if p_result[2] is not None and len(p_result[2]) > 0: if p_result[2] is not None and len(p_result[2]) > 0:
for ai_analyse_result in p_result[2]: for ai_analyse_result in p_result[2]:
order = str(int(ai_analyse_result[0])) order = str(int(ai_analyse_result[0]))
high_score_image[order] = { high_score_image[order] = {
"or_frame": frame, "or_frame": frame,
"ai_frame": p_result[1], "ai_frame": p_result[1],
"current_frame": cf,
"last_frame": cf + step,
"current_frame": concurrent_frame,
"last_frame": concurrent_frame + step,
"progress": "", "progress": "",
"mode_service": "online", "mode_service": "online",
"model_type_code": model_type_code, "model_type_code": model_type_code,
high_score_image[order] = { high_score_image[order] = {
"or_frame": frame, "or_frame": frame,
"ai_frame": p_result[1], "ai_frame": p_result[1],
"current_frame": cf,
"last_frame": cf + step,
"current_frame": concurrent_frame,
"last_frame": concurrent_frame + step,
"progress": "", "progress": "",
"mode_service": "online", "mode_service": "online",
"model_type_code": model_type_code, "model_type_code": model_type_code,
"model_detection_code": order, "model_detection_code": order,
"socre": conf_c "socre": conf_c
} }
if cf % step == 0 and len(high_score_image) > 0:
if concurrent_frame % step == 0 and len(high_score_image) > 0:
if self.content["service"]["filter"]["picture_similarity"]: if self.content["service"]["filter"]["picture_similarity"]:
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame")) hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
for value in high_score_image.values(): for value in high_score_image.values():
self.imageQueue.put({"image": value}) self.imageQueue.put({"image": value})
high_score_image.clear() high_score_image.clear()
concurrent_frame += 1
logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id")) logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id"))
except ServiceException as s: except ServiceException as s:
self.sendResult({"command": "stop_heartbeat_imageFileUpdate"}) self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
analyse_time=TimeUtils.now_date_to_str())}) analyse_time=TimeUtils.now_date_to_str())})
finally: finally:
if cv2tool is not None: if cv2tool is not None:
cv2tool.stop_cv2()
cv2tool.close()
self.sendResult({"command": "stop"}) self.sendResult({"command": "stop"})
commonProcess.join() commonProcess.join()
# 删除本地视频文件 # 删除本地视频文件
class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess):


def stop_task(self, cv2tool, aiFilePath, analysisStatus): def stop_task(self, cv2tool, aiFilePath, analysisStatus):
cv2tool.stop_cv2()
cv2tool.close()
if not os.path.exists(aiFilePath): if not os.path.exists(aiFilePath):
logger.error("AI视频不存在!requestId:{}", self.msg.get("request_id")) logger.error("AI视频不存在!requestId:{}", self.msg.get("request_id"))
raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0], raise ServiceException(ExceptionType.VIDEO_ADDRESS_EXCEPTION.value[0],
LogUtils.init_log(self.content) LogUtils.init_log(self.content)
# 程序开始时间 # 程序开始时间
start_time = time.time() start_time = time.time()
mod, model_type_code, modelConfig = get_model((self.config, str(self.gpu_ids[0]), self.msg["models"]))
# mod_thread = Common(None, func=get_model, args=(self.config, str(self.gpu_ids[0]), self.msg["models"]))
# mod_thread.setDaemon(True)
# mod_thread.start()
# mod = None
# model_type_code = None
# modelConfig = None
mod, model_type_code = get_model((str(self.gpu_ids[0]), self.msg["models"]))
# 创建心跳队列 # 创建心跳队列
hbQueue = Queue() hbQueue = Queue()
# 结果反馈进程启动 # 结果反馈进程启动
".mp4") ".mp4")
cv2tool = Cv2Util(self.msg.get('original_url'), self.msg.get('push_url'), aiFilePath=aiFilePath, cv2tool = Cv2Util(self.msg.get('original_url'), self.msg.get('push_url'), aiFilePath=aiFilePath,
requestId=self.msg.get("request_id")) requestId=self.msg.get("request_id"))
cv2tool.build_cv2()
cv2tool.build_write(False)
# cv2重试初始化次数 # cv2重试初始化次数
cv2_init_num = 1 cv2_init_num = 1
high_score_image = {} high_score_image = {}
step = int(self.content["service"]["frame_step"]) step = int(self.content["service"]["frame_step"])
# 模型初始化速度
# model = 0
# 总视频帧数
all_f = None
if cv2tool.cap is not None:
all_f = int(cv2tool.cap.get(7))
# 当前帧数
concurrent_frame = 1
cv2tool.get_video_info()
cv2tool.build_write(False)
while True: while True:
end_time = time.time() end_time = time.time()
create_task_time = end_time - start_time create_task_time = end_time - start_time
if not commonProcess.is_alive(): if not commonProcess.is_alive():
logger.info("图片上传、心跳、问题反馈进程异常停止, requestId: {}", self.msg.get("request_id")) logger.info("图片上传、心跳、问题反馈进程异常停止, requestId: {}", self.msg.get("request_id"))
raise Exception("图片上传、心跳、问题反馈进程异常停止") raise Exception("图片上传、心跳、问题反馈进程异常停止")
# 检查是否获取到视频信息
eBody = self.getEvent() eBody = self.getEvent()
if eBody is not None and len(eBody) > 0: if eBody is not None and len(eBody) > 0:
cmdStr = eBody.get("command") cmdStr = eBody.get("command")
logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id")) logger.info("离线任务开始停止分析, requestId: {}", self.msg.get("request_id"))
self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
break break
# 检测是否断流
if cv2tool.cap is None or not cv2tool.cap.isOpened():
logger.info("cv2初始化重试:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
if cv2tool.checkconfig():
logger.info("视频信息获取次数:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
if cv2_init_num >= 3:
raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
cv2_init_num += 1
time.sleep(1)
cv2tool.get_video_info()
cv2tool.build_write(False)
continue
cv2_init_num = 0
if cv2tool.pull_p is None:
logger.info("视频拉流管道初始化次数:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
if cv2_init_num >= 3: if cv2_init_num >= 3:
raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0], raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0],
ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1]) ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1])
cv2_init_num += 1 cv2_init_num += 1
time.sleep(1) time.sleep(1)
cv2tool.build_cv2()
all_f = int(cv2tool.cap.get(7))
cv2tool.build_pull_p()
continue
cv2_init_num = 0
if cv2tool.p is None:
logger.info("视频推流管道初始化次数:{}次, requestId: {}", cv2_init_num, self.msg.get("request_id"))
if cv2_init_num >= 3:
raise ServiceException(ExceptionType.AI_VIDEO_ADDRESS_EXCEPTION.value[0],
ExceptionType.AI_VIDEO_ADDRESS_EXCEPTION.value[1])
cv2_init_num += 1
time.sleep(1)
cv2tool.build_p()
continue continue
start_read_time = time.time() start_read_time = time.time()
is_opened, frame = cv2tool.cap.read()
cf = int(cv2tool.cap.get(1))
if is_opened is None or not is_opened:
logger.info("总帧数: {}, 当前帧数: {}, requestId: {}, is_opened: {}", float(cv2tool.cap.get(7)),
cv2tool.cap.get(1), self.msg.get("request_id"), is_opened)
in_bytes = cv2tool.pull_p.stdout.read(cv2tool.width * cv2tool.height * 3)
if not in_bytes:
logger.info("总帧数: {}, 当前帧数: {}, requestId: {}, in_bytes: {}", cv2tool.all_frames,
concurrent_frame, self.msg.get("request_id"), in_bytes)
logger.info("离线读流结束,读流时间: {}", time.time() - start_read_time) logger.info("离线读流结束,读流时间: {}", time.time() - start_read_time)
if high_score_image is not None and len(high_score_image) > 0: if high_score_image is not None and len(high_score_image) > 0:
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
self.imageQueue.put({"image": high_score_image[key]}) self.imageQueue.put({"image": high_score_image[key]})
del high_score_image[key] del high_score_image[key]
if float(cf) < float(all_f):
if concurrent_frame < cv2tool.all_frames:
logger.info("离线异常结束:requestId: {}", self.msg.get("request_id")) logger.info("离线异常结束:requestId: {}", self.msg.get("request_id"))
self.stop_task(cv2tool, aiFilePath, AnalysisStatus.TIMEOUT.value) self.stop_task(cv2tool, aiFilePath, AnalysisStatus.TIMEOUT.value)
break break
logger.info("任务开始结束分析, requestId: {}", self.msg.get("request_id")) logger.info("任务开始结束分析, requestId: {}", self.msg.get("request_id"))
self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value) self.stop_task(cv2tool, aiFilePath, AnalysisStatus.SUCCESS.value)
break break
# if mod is None and model == 0:
# model += 1
# logger.info("初始化模型: {}次, requestId: {}", model, self.msg.get("request_id"))
# mod, model_type_code, modelConfig = mod_thread.get_result()
# time00 = time.time()
frame = (np.frombuffer(in_bytes, np.uint8).reshape([cv2tool.height, cv2tool.width, 3]))
time00 = time.time()
# 调用AI模型 # 调用AI模型
p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
# logger.info("算法模型调度时间:{}s, requestId:{}", time.time() - time00, self.msg.get("request_id"))
p_result, timeOut = mod.process(copy.deepcopy(frame), cv2tool.width)
logger.info("算法模型调度时间:{}s, requestId:{}", time.time() - time00, self.msg.get("request_id"))
# 原视频保存本地、AI视频保存本地 # 原视频保存本地、AI视频保存本地
if self.content["video"]["video_add_water"]: if self.content["video"]["video_add_water"]:
frame = self.pic.common_water_1(frame, self.logo) frame = self.pic.common_water_1(frame, self.logo)
logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id")) logger.exception("构建p管道重试失败:{}, requestId: {}", e, self.msg.get("request_id"))
break break
# # 问题图片加入队列, 暂时写死,后期修改为真实问题 # # 问题图片加入队列, 暂时写死,后期修改为真实问题
if cf % 400 == 0:
hbQueue.put({"cf": cf, "af": all_f})
if concurrent_frame % 400 == 0:
hbQueue.put({"cf": concurrent_frame, "af": cv2tool.all_frames})
if p_result[2] is not None and len(p_result[2]) > 0: if p_result[2] is not None and len(p_result[2]) > 0:
for ai_analyse_result in p_result[2]: for ai_analyse_result in p_result[2]:
order = str(int(ai_analyse_result[0])) order = str(int(ai_analyse_result[0]))
high_score_image[order] = { high_score_image[order] = {
"or_frame": frame, "or_frame": frame,
"ai_frame": p_result[1], "ai_frame": p_result[1],
"current_frame": cf,
"last_frame": cf + step,
"current_frame": concurrent_frame,
"last_frame": concurrent_frame + step,
"progress": "", "progress": "",
"mode_service": "offline", "mode_service": "offline",
"model_type_code": model_type_code, "model_type_code": model_type_code,
high_score_image[order] = { high_score_image[order] = {
"or_frame": frame, "or_frame": frame,
"ai_frame": p_result[1], "ai_frame": p_result[1],
"current_frame": cf,
"last_frame": cf + step,
"current_frame": concurrent_frame,
"last_frame": concurrent_frame + step,
"progress": "", "progress": "",
"mode_service": "offline", "mode_service": "offline",
"model_type_code": model_type_code, "model_type_code": model_type_code,
"model_detection_code": order, "model_detection_code": order,
"socre": conf_c "socre": conf_c
} }
if cf % step == 0 and len(high_score_image) > 0:
if concurrent_frame % step == 0 and len(high_score_image) > 0:
if self.content["service"]["filter"]["picture_similarity"]: if self.content["service"]["filter"]["picture_similarity"]:
for key in list(high_score_image.keys()): for key in list(high_score_image.keys()):
hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame")) hash1 = ImageUtils.dHash(high_score_image[key].get("ai_frame"))
similarity = 1 - dist * 1.0 / 64 similarity = 1 - dist * 1.0 / 64
if similarity < self.content["service"]["filter"]["similarity"]: if similarity < self.content["service"]["filter"]["similarity"]:
self.imageQueue.put({"image": high_score_image[key]}) self.imageQueue.put({"image": high_score_image[key]})
del high_score_image[key]
del high_score_image[key]
else: else:
for value in high_score_image.values(): for value in high_score_image.values():
self.imageQueue.put({"image": value}) self.imageQueue.put({"image": value})
high_score_image.clear() high_score_image.clear()
hbQueue.put({"cf": cf, "af": all_f})
hbQueue.put({"cf": concurrent_frame, "af": cv2tool.all_frames})
concurrent_frame += 1
logger.info("分析总时间{},requestId:{}", time.time()-end_time,self.msg.get("request_id"))
logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id")) logger.info("离线进程任务完成,requestId:{}", self.msg.get("request_id"))
except ServiceException as s: except ServiceException as s:
self.sendResult({"command": "stop_heartbeat_imageFileUpdate"}) self.sendResult({"command": "stop_heartbeat_imageFileUpdate"})
analyse_time=TimeUtils.now_date_to_str())}) analyse_time=TimeUtils.now_date_to_str())})
finally: finally:
if cv2tool is not None: if cv2tool is not None:
cv2tool.stop_cv2()
cv2tool.close()
self.sendResult({"command": "stop"}) self.sendResult({"command": "stop"})
commonProcess.join() commonProcess.join()
# 删除本地视频文件 # 删除本地视频文件


def get_model(args): def get_model(args):
logger.info("######################开始加载模型######################") logger.info("######################开始加载模型######################")
for model in args[2]:
for model in args[1]:
try: try:
code = model.get("code") code = model.get("code")
needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")] needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")]
logger.info("code:{}, 检查目标:{}, gpuId:{}", code, needed_objectsIndex, args[1])
logger.info("code:{}, 检查目标:{}, gpuId:{}", code, needed_objectsIndex, args[0])
if code == ModelType.WATER_SURFACE_MODEL.value[1]: if code == ModelType.WATER_SURFACE_MODEL.value[1]:
logger.info("######################加载河道模型######################") logger.info("######################加载河道模型######################")
return ModelUtils.SZModel(args[1], needed_objectsIndex), code, args[0].get("sz")
return ModelUtils.SZModel(args[0], needed_objectsIndex), code
elif code == ModelType.FOREST_FARM_MODEL.value[1]: elif code == ModelType.FOREST_FARM_MODEL.value[1]:
logger.info("######################加载林场模型######################") logger.info("######################加载林场模型######################")
return ModelUtils.LCModel(args[1], needed_objectsIndex), code, args[0].get("lc")
return ModelUtils.LCModel(args[0], needed_objectsIndex), code
elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]: elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]:
logger.info("######################加载交通模型######################") logger.info("######################加载交通模型######################")
return ModelUtils.RFModel(args[1], needed_objectsIndex), code, args[0].get("rf")
return ModelUtils.RFModel(args[0], needed_objectsIndex), code
else: else:
logger.error("未匹配到对应的模型") logger.error("未匹配到对应的模型")
raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0], raise ServiceException(ExceptionType.AI_MODEL_MATCH_EXCEPTION.value[0],

+ 45
- 7
dsp_application.yml Vedi File

kafka: kafka:
topic: topic:
dsp-alg-online-tasks-topic: dsp-alg-online-tasks dsp-alg-online-tasks-topic: dsp-alg-online-tasks
dsp-alg-offline-tasks-topic: dsp-alg-offline-tasks
dsp-alg-results-topic: dsp-alg-task-results
local:
bootstrap_servers: ['192.168.10.11:9092']
dsp-alg-online-tasks: dsp-alg-online-tasks:
partition: [0] partition: [0]
dsp-alg-offline-tasks-topic: dsp-alg-offline-tasks
dsp-alg-offline-tasks: dsp-alg-offline-tasks:
partition: [0] partition: [0]
dsp-alg-results-topic: dsp-alg-task-results
dsp-alg-task-results: dsp-alg-task-results:
partition: [0] partition: [0]
local:
bootstrap_servers: ['192.168.10.11:9092']
producer: producer:
acks: -1 acks: -1
retries: 3 retries: 3
max_poll_records: 1 max_poll_records: 1
dev: dev:
bootstrap_servers: ['192.168.11.13:9092'] bootstrap_servers: ['192.168.11.13:9092']
dsp-alg-online-tasks:
partition: [0]
dsp-alg-offline-tasks:
partition: [0]
dsp-alg-task-results:
partition: [0]
producer: producer:
acks: -1 acks: -1
retries: 3 retries: 3
max_poll_records: 1 max_poll_records: 1
test: test:
bootstrap_servers: ['192.168.11.242:9092'] bootstrap_servers: ['192.168.11.242:9092']
dsp-alg-online-tasks:
partition: [0]
dsp-alg-offline-tasks:
partition: [0]
dsp-alg-task-results:
partition: [0]
producer: producer:
acks: -1 acks: -1
retries: 3 retries: 3
auto_offset_reset: latest auto_offset_reset: latest
enable_auto_commit: False enable_auto_commit: False
max_poll_records: 1 max_poll_records: 1
prod:
prod12:
bootstrap_servers: ['101.132.127.1:19092']
dsp-alg-online-tasks:
partition: [0]
dsp-alg-offline-tasks:
partition: [0]
dsp-alg-task-results:
partition: [0]
producer:
acks: -1
retries: 3
linger_ms: 50
retry_backoff_ms: 1000
max_in_flight_requests_per_connection: 5
consumer:
client_id: dsp_ai_server
group_id: dsp-ai-prod
auto_offset_reset: latest
enable_auto_commit: False
max_poll_records: 1
prod13:
bootstrap_servers: ['101.132.127.1:19092'] bootstrap_servers: ['101.132.127.1:19092']
dsp-alg-online-tasks:
partition: [1]
dsp-alg-offline-tasks:
partition: [1]
dsp-alg-task-results:
partition: [1]
producer: producer:
acks: -1 acks: -1
retries: 3 retries: 3
# 日志设置 # 日志设置
log: log:
# 是否开启文件输出 True:开启 False:关闭 # 是否开启文件输出 True:开启 False:关闭
enable_file_log: True
enable_file_log: False
# 是否开启控制台日志输出 True:开启 False:关闭 # 是否开启控制台日志输出 True:开启 False:关闭
enable_stderr: False
enable_stderr: True
# 日志打印文件夹 # 日志打印文件夹
base_path: /home/DATA/dsp/python/logs/ base_path: /home/DATA/dsp/python/logs/
# 日志文件名称 # 日志文件名称

+ 2
- 11
service/Dispatcher.py Vedi File

import GPUtil import GPUtil
import cv2 import cv2


from config.ModelConfig import SZModelConfig, LCModelConfig, RFModelConfig
from util import YmlUtils, FileUtils, LogUtils from util import YmlUtils, FileUtils, LogUtils
from loguru import logger from loguru import logger
from multiprocessing import Queue from multiprocessing import Queue
self.offlineMpt = None self.offlineMpt = None
self.pic = PictureWaterMark() self.pic = PictureWaterMark()
self.logo = cv2.imread("./image/logo.png", -1) self.logo = cv2.imread("./image/logo.png", -1)
sz = SZModelConfig()
lc = LCModelConfig()
rf = RFModelConfig()
self.config = {
"sz": (sz.names, sz.label_arraylist, sz.rainbows, sz.conf_thres, sz.iou_thres),
"lc": (lc.names, lc.label_arraylist, lc.rainbows, lc.conf_thres, lc.iou_thres),
"rf": (rf.names, rf.label_arraylist, rf.rainbows, rf.conf_thres, rf.iou_thres),
}


# 服务调用启动方法 # 服务调用启动方法
def start_service(self): def start_service(self):
cfg = {"fbQueue": Queue(), "imageQueue": Queue(), "content": content, "msg": msg, "gpu_ids": gpu_ids, cfg = {"fbQueue": Queue(), "imageQueue": Queue(), "content": content, "msg": msg, "gpu_ids": gpu_ids,
"pic": self.pic, "logo": self.logo} "pic": self.pic, "logo": self.logo}
# 创建在线识别进程并启动 # 创建在线识别进程并启动
oirp = OnlineIntelligentRecognitionProcess(cfg, self.config)
oirp = OnlineIntelligentRecognitionProcess(cfg)
oirp.start() oirp.start()
# 记录请求与进程映射 # 记录请求与进程映射
self.onlineProcesses[msg.get("request_id")] = oirp self.onlineProcesses[msg.get("request_id")] = oirp
cfg = {"fbQueue": Queue(), "imageQueue": Queue(), "content": content, "msg": msg, "gpu_ids": gpu_ids, cfg = {"fbQueue": Queue(), "imageQueue": Queue(), "content": content, "msg": msg, "gpu_ids": gpu_ids,
"pic": self.pic, "logo": self.logo} "pic": self.pic, "logo": self.logo}
# 创建在线识别进程并启动 # 创建在线识别进程并启动
ofirp = OfflineIntelligentRecognitionProcess(cfg, self.config)
ofirp = OfflineIntelligentRecognitionProcess(cfg)
ofirp.start() ofirp.start()
self.offlineProcesses[msg.get("request_id")] = ofirp self.offlineProcesses[msg.get("request_id")] = ofirp



+ 7
- 6
test/asnyc.py Vedi File

print("222222222222222222222222222222222222222222222222") print("222222222222222222222222222222222222222222222222")


if __name__ == "__main__": if __name__ == "__main__":

aa = threading.Thread(target=fun4)
aa.start()
time.sleep(5)
while True:
print("111111111111111111111111111")
aa = round(3840/1920*3)
print(aa)
# aa = threading.Thread(target=fun4)
# aa.start()
# time.sleep(5)
# while True:
# print("111111111111111111111111111")

+ 72
- 11
test/cv2test1.py Vedi File

import time
import cv2 import cv2
import subprocess as sp
# 推流
import numpy as np


if __name__ == "__main__":
# 先把图片灰度处理。
img = cv2.imread('image/AI6.jpg')
img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
if __name__== "__main__":


template = cv2.imread('image/AI.jpg')
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
h, w = template.shape[:2]


# 匹配
result = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
# with open(str(cv2.__file__),"r") as f:
# print (f.read())


min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
print(min_val, max_val, min_loc, max_loc)
# cap = cv2.VideoCapture("rtmp://live.play.t-aaron.com/live/THSAs")
# # Get video information
# fps = int(cap.get(cv2.CAP_PROP_FPS))
# print(fps)
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# print(width)
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# print(height)
# fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
# print(fourcc)
# # print(cv2.getBuildInformation())
# # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
# # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
# # cap.set(cv2.CAP_PROP_FPS, 60)
# # cap.setExceptionMode(True)
# print(cap.getExceptionMode())




# ffmpeg command
command = ['ffmpeg',
'-y', # 不经过确认,输出时直接覆盖同名文件。
'-f', 'rawvideo',
'-vcodec','rawvideo',
'-pix_fmt', 'bgr24',
'-s', "{}x{}".format(1920, 1080),
# '-s', "{}x{}".format(1280, 720),
'-i', '-', # 指定输入文件
'-c:v', 'libx264', # 指定视频编码器
'-pix_fmt', 'yuv420p',
'-g', '5',
"-an",
'-b:v', '3000k',
'-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
'-f', 'flv',
"rtmp://live.push.t-aaron.com/live/THSAk"]

# 管道配置
p = sp.Popen(command, stdin=sp.PIPE, shell=False)

command = ['ffmpeg',
# '-b:v', '3000k',
'-i', 'rtmp://live.play.t-aaron.com/live/THSAs',
'-f', 'rawvideo',
'-vcodec','rawvideo',
'-pix_fmt', 'bgr24',
# '-s', "{}x{}".format(int(width), int(height)),
'-an',
'-']
pull_p = sp.Popen(command, stdout=sp.PIPE)
# while(cap.isOpened()):
while True:
start =time.time()
# ret, frame = cap.read()
# print(cap.grab())
in_bytes = pull_p.stdout.read(1920 * 1080 * 3)
print("aaaaaaaaaaaaaaa", time.time()-start)
# print("aaaaaaaaaaaaaa", time.time()-start)
# start =time.time()
# a,b = cap.retrieve()
if not in_bytes:
print("Opening camera is failed")
break
frame = (np.frombuffer(in_bytes, np.uint8).reshape([1080, 1920, 3]))
print("ccccccccccccccc", time.time()-start)
p.stdin.write(frame.tostring())
print("bbbbbbbbbbbbbb", time.time()-start)



+ 0
- 0
test/ffmpeg11/__init__.py Vedi File


+ 133
- 0
test/ffmpeg11/ffmpeg11.py Vedi File

import json
import time
import subprocess as sp
import ffmpeg
import numpy
import cv2
import sys
import random

import numpy as np


def read_frame_as_jpeg(in_file, frame_num):
"""
指定帧数读取任意帧
"""
out, err = (
ffmpeg.input(in_file)
.filter('select', 'gte(n,{})'.format(frame_num))
.output('pipe:', vframes=1, format='image2', vcodec='mjpeg')
.run(capture_stdout=True)
)
return out


"""
获取视频基本信息
"""


def get_video_info(in_file):
try:
probe = ffmpeg.probe(in_file)
# format = probe['format']
# size = int(format['size'])/1024/1024
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
if video_stream is None:
print('No video stream found', file=sys.stderr)
return
width = int(video_stream['width'])
height = int(video_stream['height'])
# num_frames = int(video_stream['nb_frames'])
# up, down = str(video_stream['r_frame_rate']).split('/')
# fps = eval(up) / eval(down)
# duration = float(video_stream['duration'])
bit_rate = int(video_stream['bit_rate'])/1000
print('width: {}'.format(width))
print('height: {}'.format(height))
# print('num_frames: {}'.format(num_frames))
print('bit_rate: {}k'.format(bit_rate))
# print('fps: {}'.format(fps))
# print('size: {}MB'.format(size))
# print('duration: {}'.format(duration))
return video_stream



except Exception as err:
print("aaaaaaaaaaaaaaaaaaaa", err)


if __name__ == '__main__':
file_path = 'rtmp://live.play.t-aaron.com/live/THSAk'
#file_path = 'https://vod.play.t-aaron.com/customerTrans/edc96ea2115a0723a003730956208134/40b416f7-183b57f6be0-0004-f90c-f2c-7ec68.mp4'
#file_path = 'https://vod.play.t-aaron.com/3301fc8e166f45be88f2214e7a8f4a9d/e29535365b54434d9ed2e8c3b0a175da-fba35541b31a1049ca05b145a283c33a-hd.mp4'
video_info = get_video_info(file_path)
print(json.dumps(video_info))
# total_frames = int(video_info['nb_frames'])
# print('总帧数:' + str(total_frames))
# random_frame = random.randint(1, total_frames)
# print('随机帧:' + str(random_frame))
# out = read_frame_as_jpeg(file_path, i)
# image_array = numpy.asarray(bytearray(out), dtype="uint8")
# image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
# kwargs={'fflags': 'nobuffer', 'flags': 'low_delay'}
kwargs={
# "hwaccel": "nvdec",
# "vcodec": "h264_cuvid",
# "c:v": "h264_cuvid"
}
output_args = {
# "vcodec": "hevc_nvenc",
# "c:v": "hevc_nvenc",
# "preset": "fast",
}
# i = 1
# process1 = (
# ffmpeg
# .input(file_path, **kwargs)
# .output('pipe:', format='rawvideo', pix_fmt='rgb24', **output_args)
# # .global_args("-an")
# .overwrite_output()
# .run_async(pipe_stdout=True, pipe_stderr=True)
# )
width = int(video_info['width'])
height = int(video_info['height'])
command = ['ffmpeg',
'-i', file_path,
'-f', 'rawvideo',
'-pix_fmt', 'bgr24',
'-vcodec','rawvideo',
# '-s', "{}x{}".format(int(width), int(height)),
'-an',
'-']
p = sp.Popen(command, stdout=sp.PIPE)
ai_video_file = cv2.VideoWriter(r"C:\Users\chenyukun\Desktop\shipin\aa.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 30,
(width, height))
start1 = time.time()
while True:
start = time.time()
in_bytes = p.stdout.read(width * height * 3)
if not in_bytes:
print(in_bytes)
ai_video_file.release()
p.stdout.close()
p.wait()
break
# 转成ndarray
in_frame = (np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3]))
# frame = cv2.resize(in_frame, (1280, 720)) # 改变图片尺寸
# frame = cv2.cvtColor(in_frame, cv2.COLOR_RGB2BGR) # 转成BGR
# i += 1
print(round(time.time()-start, 5))

ai_video_file.write(in_frame)
if time.time() - start1 > 60:
ai_video_file.release()
p.stdout.close()
p.wait()
break
# cv2.imshow('frame', frame)
# time.sleep(1111)
p.kill()

+ 291
- 44
test/ffmpeg2.py Vedi File

import copy
import subprocess as sp import subprocess as sp
from enum import Enum, unique
from PIL import Image from PIL import Image
import time import time
import cv2 import cv2
import oss2

import sys
sys.path.extend(['..','../AIlib' ])

from AI import AI_process, AI_process_forest, get_postProcess_para
import cv2,os,time
from segutils.segmodel import SegModel
from models.experimental import attempt_load
from utils.torch_utils import select_device
from utilsK.queRiver import get_labelnames,get_label_arrays
import numpy as np import numpy as np
# 推流
if __name__== "__main__":

cap = cv2.VideoCapture(r"https://vod.play.t-aaron.com/customerTrans/14d44756fa6d37db17008d98bdee3558/18ac4fa7-18369b0e703-0004-f90c-f2c-7ec68.mp4")

# Get video information
fps = int(cap.get(cv2.CAP_PROP_FPS))
print(fps)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
print(width)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(height)
# ffmpeg command
# command = ['D:/百度下载/ffmpeg-20200831-4a11a6f-win64-static/bin/ffmpeg.exe',
# '-y', # 不经过确认,输出时直接覆盖同名文件。
# '-f', 'rawvideo',
# '-vcodec','rawvideo',
# '-pix_fmt', 'bgr24',
# '-s', "{}x{}".format(width, height),
# # '-s', "{}x{}".format(1280, 720),
# '-i', '-', # 指定输入文件
# '-c:v', 'libx264', # 指定视频编码器
# '-pix_fmt', 'yuv420p',
# '-r', '15',
# '-g', '15',
# "-an",
# '-b:v', '3000k',
# '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
# '-f', 'flv',
# "rtmp://live.push.t-aaron.com/live/THSAk"]

# # 管道配置
# p = sp.Popen(command, stdin=sp.PIPE, shell=False)
while(cap.isOpened()):
start =time.time()
# ret, frame = cap.read()
cap.grab()
print(time.time()-start)
# if not ret:
# print("Opening camera is failed")
# break
# p.stdin.write(frame.tostring())
import torch
from utilsK.masterUtils import get_needed_objectsIndex





# 异常枚举
@unique
class ModelType(Enum):

WATER_SURFACE_MODEL = ("1", "001", "水面模型")

FOREST_FARM_MODEL = ("2", "002", "森林模型")

TRAFFIC_FARM_MODEL = ("3", "003", "交通模型")

def checkCode(code):
for model in ModelType:
if model.value[1] == code:
return True
return False
class ModelConfig():
def __init__(self):
postFile = '../AIlib/conf/para.json'
self.conf_thres, self.iou_thres, self.classes, self.rainbows = get_postProcess_para(postFile)


class SZModelConfig(ModelConfig):

def __init__(self):
super(SZModelConfig, self).__init__()
labelnames = "../AIlib/weights/yolov5/class8/labelnames.json" ##对应类别表
self.names = get_labelnames(labelnames)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40,
fontpath="../AIlib/conf/platech.ttf")


class LCModelConfig(ModelConfig):
def __init__(self):
super(LCModelConfig, self).__init__()
labelnames = "../AIlib/weights/forest/labelnames.json"
self.names = get_labelnames(labelnames)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40, fontpath="../AIlib/conf/platech.ttf")


class RFModelConfig(ModelConfig):
def __init__(self):
super(RFModelConfig, self).__init__()
labelnames = "../AIlib/weights/road/labelnames.json"
self.names = get_labelnames(labelnames)
imageW = 1536
outfontsize=int(imageW/1920*40)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=outfontsize, fontpath="../AIlib/conf/platech.ttf")

class Model():
def __init__(self, device, allowedList=None):
##预先设置的参数
self.device_ = device ##选定模型,可选 cpu,'0','1'
self.allowedList = allowedList


# 水面模型
class SZModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)

self.device = select_device(self.device_)
self.half = self.device.type != 'cpu'
self.model = attempt_load("../AIlib/weights/yolov5/class8/bestcao.pt", map_location=self.device)
if self.half:
self.model.half()
self.segmodel = SegModel(nclass=2, weights='../AIlib/weights/STDC/model_maxmIOU75_1720_0.946_360640.pth',
device=self.device)

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process([frame], self.model, self.segmodel, config[0], config[1],
config[2], self.half, self.device, config[3], config[4],
self.allowedList)


# 森林模型
class LCModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)
self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/forest/best.pt", map_location=self.device) # load FP32 model
if self.half:
self.model.half()
self.segmodel = None

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)

# 交通模型
class RFModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)
self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/road/best.pt", map_location=self.device) # load FP32 model
if self.half:
self.model.half()
self.segmodel = None

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)

def get_model(args):
for model in args[2]:
try:
code = '001'
needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")]
if code == ModelType.WATER_SURFACE_MODEL.value[1]:
return SZModel(args[1], needed_objectsIndex), code, args[0].get("sz")
elif code == ModelType.FOREST_FARM_MODEL.value[1]:
return LCModel(args[1], needed_objectsIndex), code, args[0].get("lc")
elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]:
return RFModel(args[1], needed_objectsIndex), code, args[0].get("rf")
else:
raise Exception("11111")
except Exception as e:
raise Exception("22222")
class PictureWaterMark():

def common_water(self, image, logo):
width, height = image.shape[1], image.shape[0]
mark_width, mark_height = logo.shape[1], logo.shape[0]
rate = int(width * 0.2) / mark_width
logo_new = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST)
position = (int(width * 0.95 - logo_new.shape[1]), int(height * 0.95 - logo_new.shape[0]))
b = Image.new('RGBA', (width, height), (0, 0, 0, 0)) # 创建新图像:透明'
a = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
watermark = Image.fromarray(cv2.cvtColor(logo_new, cv2.COLOR_BGRA2RGBA))
# 图片旋转
# watermark = watermark.rotate(45)
b.paste(a, (0, 0))
b.paste(watermark, position, mask=watermark)
return cv2.cvtColor(np.asarray(b), cv2.COLOR_BGR2RGB)

def common_water_1(self, image, logo, alpha=1):
h, w = image.shape[0], image.shape[1]
if w >= h:
rate = int(w * 0.1) / logo.shape[1]
else:
rate = int(h * 0.1) / logo.shape[0]
mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST)
mask_h, mask_w = mask.shape[0], mask.shape[1]
mask_channels = cv2.split(mask)
dst_channels = cv2.split(image)
# b, g, r, a = cv2.split(mask)
# 计算mask在图片的坐标
ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w))
dr_points = (int(h * 0.95), int(w - h * 0.05))
for i in range(3):
dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][
ul_points[0]: dr_points[0],
ul_points[1]: dr_points[1]] * (
255.0 - mask_channels[3] * alpha) / 255
dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] += np.array(
mask_channels[i] * (mask_channels[3] * alpha / 255), dtype=np.uint8)
dst_img = cv2.merge(dst_channels)
return dst_img
def video_merge(frame1, frame2, width, height):
frameLeft = cv2.resize(frame1, (width, height), interpolation=cv2.INTER_LINEAR)
frameRight = cv2.resize(frame2, (width, height), interpolation=cv2.INTER_LINEAR)
frame_merge = np.hstack((frameLeft, frameRight))
# frame_merge = np.hstack((frame1, frame2))
return frame_merge
cap = cv2.VideoCapture("/home/DATA/chenyukun/3.mp4")

# Get video information
fps = int(cap.get(cv2.CAP_PROP_FPS))
print(fps)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
print(width)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(height)
# command = ['ffmpeg',
# '-y', # 不经过确认,输出时直接覆盖同名文件。
# '-f', 'rawvideo',
# '-vcodec', 'rawvideo',
# '-pix_fmt', 'bgr24',
# # '-s', "{}x{}".format(self.width * 2, self.height),
# '-s', "{}x{}".format(width, height),
# '-r', str(15),
# '-i', '-', # 指定输入文件
# '-g', '15',
# '-sc_threshold', '0', # 使得GOP的插入更加均匀
# '-b:v', '3000k', # 指定码率
# '-tune', 'zerolatency', # 加速编码速度
# '-c:v', 'libx264', # 指定视频编码器
# '-pix_fmt', 'yuv420p',
# "-an",
# '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
# '-f', 'flv',
# "rtmp://live.push.t-aaron.com/live/THSAk"]
#
# # 管道配置
# p = sp.Popen(command, stdin=sp.PIPE, shell=False)

sz = SZModelConfig()
lc = LCModelConfig()
rf = RFModelConfig()
config = {
"sz": (sz.names, sz.label_arraylist, sz.rainbows, sz.conf_thres, sz.iou_thres),
"lc": (lc.names, lc.label_arraylist, lc.rainbows, lc.conf_thres, lc.iou_thres),
"rf": (rf.names, rf.label_arraylist, rf.rainbows, rf.conf_thres, rf.iou_thres),
}
model = {
"models": [
{
"code": "001",
"categories": [
{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
},
{
"id": "4",
"config": {}
},
{
"id": "5",
"config": {}
},
{
"id": "6",
"config": {}
},
{
"id": "7",
"config": {}
}
]
}]
}

mod, model_type_code, modelConfig = get_model((config, str(1), model.get("models")))
pic = PictureWaterMark()
logo = cv2.imread("./image/logo.png", -1)
ai_video_file = cv2.VideoWriter("/home/DATA/chenyukun/aa/1.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (width*2, height))
while(cap.isOpened()):
start =time.time()
ret, frame = cap.read()
# cap.grab()
if not ret:
print("Opening camera is failed")
break
p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
frame = pic.common_water_1(frame, logo)
p_result[1] = pic.common_water_1(p_result[1], logo)
frame_merge = video_merge(frame, p_result[1], width, height)
ai_video_file.write(frame_merge)
print(time.time()-start)
ai_video_file.release()
cap.release()






+ 297
- 0
test/ffmpeg3.py Vedi File

import copy
import subprocess as sp
from enum import Enum, unique
from PIL import Image
import time
import cv2

import sys
sys.path.extend(['..','../AIlib' ])

from AI import AI_process, AI_process_forest, get_postProcess_para
import cv2,os,time
from segutils.segmodel import SegModel
from models.experimental import attempt_load
from utils.torch_utils import select_device
from utilsK.queRiver import get_labelnames,get_label_arrays
import numpy as np
import torch
from utilsK.masterUtils import get_needed_objectsIndex





# 异常枚举
@unique
class ModelType(Enum):

WATER_SURFACE_MODEL = ("1", "001", "水面模型")

FOREST_FARM_MODEL = ("2", "002", "森林模型")

TRAFFIC_FARM_MODEL = ("3", "003", "交通模型")

def checkCode(code):
for model in ModelType:
if model.value[1] == code:
return True
return False
class ModelConfig():
def __init__(self):
postFile = '../AIlib/conf/para.json'
self.conf_thres, self.iou_thres, self.classes, self.rainbows = get_postProcess_para(postFile)


class SZModelConfig(ModelConfig):

def __init__(self):
super(SZModelConfig, self).__init__()
labelnames = "../AIlib/weights/yolov5/class8/labelnames.json" ##对应类别表
self.names = get_labelnames(labelnames)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40,
fontpath="../AIlib/conf/platech.ttf")


class LCModelConfig(ModelConfig):
def __init__(self):
super(LCModelConfig, self).__init__()
labelnames = "../AIlib/weights/forest/labelnames.json"
self.names = get_labelnames(labelnames)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40, fontpath="../AIlib/conf/platech.ttf")


class RFModelConfig(ModelConfig):
def __init__(self):
super(RFModelConfig, self).__init__()
labelnames = "../AIlib/weights/road/labelnames.json"
self.names = get_labelnames(labelnames)
imageW = 1536
outfontsize=int(imageW/1920*40)
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=outfontsize, fontpath="../AIlib/conf/platech.ttf")

class Model():
def __init__(self, device, allowedList=None):
##预先设置的参数
self.device_ = device ##选定模型,可选 cpu,'0','1'
self.allowedList = allowedList


# 水面模型
class SZModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)

self.device = select_device(self.device_)
self.half = self.device.type != 'cpu'
self.model = attempt_load("../AIlib/weights/yolov5/class8/bestcao.pt", map_location=self.device)
if self.half:
self.model.half()
self.segmodel = SegModel(nclass=2, weights='../AIlib/weights/STDC/model_maxmIOU75_1720_0.946_360640.pth',
device=self.device)

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process([frame], self.model, self.segmodel, config[0], config[1],
config[2], self.half, self.device, config[3], config[4],
self.allowedList)


# 森林模型
class LCModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)
self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/forest/best.pt", map_location=self.device) # load FP32 model
if self.half:
self.model.half()
self.segmodel = None

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)

# 交通模型
class RFModel(Model):
def __init__(self, device, allowedList=None):
super().__init__(device, allowedList)
self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/road/best.pt", map_location=self.device) # load FP32 model
if self.half:
self.model.half()
self.segmodel = None

# names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)

def get_model(args):
for model in args[2]:
try:
code = '001'
needed_objectsIndex = [int(category.get("id")) for category in model.get("categories")]
if code == ModelType.WATER_SURFACE_MODEL.value[1]:
return SZModel(args[1], needed_objectsIndex), code, args[0].get("sz")
elif code == ModelType.FOREST_FARM_MODEL.value[1]:
return LCModel(args[1], needed_objectsIndex), code, args[0].get("lc")
elif code == ModelType.TRAFFIC_FARM_MODEL.value[1]:
return RFModel(args[1], needed_objectsIndex), code, args[0].get("rf")
else:
raise Exception("11111")
except Exception as e:
raise Exception("22222")
class PictureWaterMark():

def common_water(self, image, logo):
width, height = image.shape[1], image.shape[0]
mark_width, mark_height = logo.shape[1], logo.shape[0]
rate = int(width * 0.2) / mark_width
logo_new = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST)
position = (int(width * 0.95 - logo_new.shape[1]), int(height * 0.95 - logo_new.shape[0]))
b = Image.new('RGBA', (width, height), (0, 0, 0, 0)) # 创建新图像:透明'
a = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
watermark = Image.fromarray(cv2.cvtColor(logo_new, cv2.COLOR_BGRA2RGBA))
# 图片旋转
# watermark = watermark.rotate(45)
b.paste(a, (0, 0))
b.paste(watermark, position, mask=watermark)
return cv2.cvtColor(np.asarray(b), cv2.COLOR_BGR2RGB)

def common_water_1(self, image, logo, alpha=1):
h, w = image.shape[0], image.shape[1]
if w >= h:
rate = int(w * 0.1) / logo.shape[1]
else:
rate = int(h * 0.1) / logo.shape[0]
mask = cv2.resize(logo, None, fx=rate, fy=rate, interpolation=cv2.INTER_NEAREST)
mask_h, mask_w = mask.shape[0], mask.shape[1]
mask_channels = cv2.split(mask)
dst_channels = cv2.split(image)
# b, g, r, a = cv2.split(mask)
# 计算mask在图片的坐标
ul_points = (int(h * 0.95) - mask_h, int(w - h * 0.05 - mask_w))
dr_points = (int(h * 0.95), int(w - h * 0.05))
for i in range(3):
dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] = dst_channels[i][
ul_points[0]: dr_points[0],
ul_points[1]: dr_points[1]] * (
255.0 - mask_channels[3] * alpha) / 255
dst_channels[i][ul_points[0]: dr_points[0], ul_points[1]: dr_points[1]] += np.array(
mask_channels[i] * (mask_channels[3] * alpha / 255), dtype=np.uint8)
dst_img = cv2.merge(dst_channels)
return dst_img
def video_merge(frame1, frame2, width, height):
frameLeft = cv2.resize(frame1, (width, height), interpolation=cv2.INTER_LINEAR)
frameRight = cv2.resize(frame2, (width, height), interpolation=cv2.INTER_LINEAR)
frame_merge = np.hstack((frameLeft, frameRight))
# frame_merge = np.hstack((frame1, frame2))
return frame_merge
cap = cv2.VideoCapture("/home/DATA/chenyukun/4.mp4")

# Get video information
fps = int(cap.get(cv2.CAP_PROP_FPS))
print(fps)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
print(width)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(height)
# command = ['ffmpeg',
# '-y', # 不经过确认,输出时直接覆盖同名文件。
# '-f', 'rawvideo',
# '-vcodec', 'rawvideo',
# '-pix_fmt', 'bgr24',
# # '-s', "{}x{}".format(self.width * 2, self.height),
# '-s', "{}x{}".format(width, height),
# '-r', str(15),
# '-i', '-', # 指定输入文件
# '-g', '15',
# '-sc_threshold', '0', # 使得GOP的插入更加均匀
# '-b:v', '3000k', # 指定码率
# '-tune', 'zerolatency', # 加速编码速度
# '-c:v', 'libx264', # 指定视频编码器
# '-pix_fmt', 'yuv420p',
# "-an",
# '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
# '-f', 'flv',
# "rtmp://live.push.t-aaron.com/live/THSAk"]
#
# # 管道配置
# p = sp.Popen(command, stdin=sp.PIPE, shell=False)

sz = SZModelConfig()
lc = LCModelConfig()
rf = RFModelConfig()
config = {
"sz": (sz.names, sz.label_arraylist, sz.rainbows, sz.conf_thres, sz.iou_thres),
"lc": (lc.names, lc.label_arraylist, lc.rainbows, lc.conf_thres, lc.iou_thres),
"rf": (rf.names, rf.label_arraylist, rf.rainbows, rf.conf_thres, rf.iou_thres),
}
model = {
"models": [
{
"code": "001",
"categories": [
{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
},
{
"id": "4",
"config": {}
},
{
"id": "5",
"config": {}
},
{
"id": "6",
"config": {}
},
{
"id": "7",
"config": {}
}
]
}]
}

mod, model_type_code, modelConfig = get_model((config, str(0), model.get("models")))
pic = PictureWaterMark()
logo = cv2.imread("./image/logo.png", -1)
ai_video_file = cv2.VideoWriter("/home/DATA/chenyukun/aa/2.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (width*2, height))
while(cap.isOpened()):
start =time.time()
ret, frame = cap.read()
# cap.grab()
if not ret:
print("Opening camera is failed")
break
p_result, timeOut = mod.process(copy.deepcopy(frame), modelConfig)
frame = pic.common_water_1(frame, logo)
p_result[1] = pic.common_water_1(p_result[1], logo)
frame_merge = video_merge(frame, p_result[1], width, height)
ai_video_file.write(frame_merge)
print(time.time()-start)
ai_video_file.release()
cap.release()





BIN
test/image/1.jpg Vedi File

Before After
Width: 640  |  Height: 360  |  Size: 113KB

BIN
test/image/2.jpg Vedi File

Before After
Width: 640  |  Height: 360  |  Size: 119KB

+ 70
- 49
test/producer_start.py Vedi File

import json import json
import threading import threading


topicName = 'dsp-alg-online-tasks'
eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec45",
"models": [
{
"code": "001",
"categories": [{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
}]
}],
"command": "start",
"pull_url": "rtmp://live.play.t-aaron.com/live/THSAj_hd",
"push_url": "rtmp://live.push.t-aaron.com/live/THSAk",
"results_base_dir": "P20220802133841159"
}
def on_send_success(record_metadata,aaa, ad):
print("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}", record_metadata, aaa)

def on_send_error(excp):
print(excp)
producer = KafkaProducer(bootstrap_servers=['101.132.127.1:19092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody).add_callback(
on_send_success, "sdasd", "1111").add_errback(on_send_error)
result = future.get(timeout=10)
print(result)

# topicName = 'dsp-alg-offline-tasks'
# topicName = 'dsp-alg-online-tasks'
# eBody = { # eBody = {
# "request_id": "d4c909912ac741ce81ccef03fd1b2ec45", # "request_id": "d4c909912ac741ce81ccef03fd1b2ec45",
# "models": [ # "models": [
# {
# {
# "code": "001",
# "categories": [{
# "id": "0", # "id": "0",
# "config": {} # "config": {}
# }, # },
# { # {
# "id": "3", # "id": "3",
# "config": {} # "config": {}
# }
# ],
# }]
# }],
# "command": "start", # "command": "start",
# "original_url": "https://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/5abebc0b-1829c2b0c66-0004-f90c-f2c-7ec68.mp4",
# "original_type": ".mp4",
# "pull_url": "rtmp://live.play.t-aaron.com/live/THSAj_hd",
# "push_url": "rtmp://live.push.t-aaron.com/live/THSAk",
# "results_base_dir": "P20220802133841159" # "results_base_dir": "P20220802133841159"
# } # }
# producer = KafkaProducer(bootstrap_servers=['192.168.10.11:9092'],
# def on_send_success(record_metadata,aaa, ad):
# print("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}", record_metadata, aaa)
#
# def on_send_error(excp):
# print(excp)
# producer = KafkaProducer(bootstrap_servers=['101.132.127.1:19092'],
# value_serializer=lambda m: json.dumps(m).encode('utf-8')) # value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
# future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody).add_callback(
# on_send_success, "sdasd", "1111").add_errback(on_send_error)
# result = future.get(timeout=10) # result = future.get(timeout=10)
# print(result)
# print(result)

topicName = 'dsp-alg-offline-tasks'
eBody = {
"request_id": "d4c909912ac741ce81ccef03fd1b2ec46",
"models": [
{
"code": "001",
"categories": [
{
"id": "0",
"config": {}
},
{
"id": "1",
"config": {}
},
{
"id": "2",
"config": {}
},
{
"id": "3",
"config": {}
},
{
"id": "4",
"config": {}
},
{
"id": "5",
"config": {}
},
{
"id": "6",
"config": {}
},
{
"id": "7",
"config": {}
}
]
}],
"command": "start",
"original_url": "https://vod.play.t-aaron.com/430122abaedc42188e73763b57e33c3c/cd64c5ca5c454c84859d86e7dbaef7c8-e721e81dba2469bca32bce44ee238c44-hd.mp4",
"original_type": ".mp4",
"push_url": "rtmp://live.push.t-aaron.com/live/THSAa",
"results_base_dir": "P20220802133841159"
}
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody)
result = future.get(timeout=10)
print(result)

+ 13
- 13
test/余弦相似度计算.py Vedi File



if __name__ == '__main__': if __name__ == '__main__':
# 把图片表示成一个向量,通过计算向量之间的余弦距离来表征两张图片的相似度 0.9760 # 把图片表示成一个向量,通过计算向量之间的余弦距离来表征两张图片的相似度 0.9760
image1 = cv2.imread('image/AI.jpg')
image2 = cv2.imread('image/AI1.jpg')
image3 = cv2.imread('image/AI2.jpg')
image4 = cv2.imread('image/AI3.jpg')
image5 = cv2.imread('image/AI4.jpg')
image6 = cv2.imread('a.jpg')
image7 = cv2.imread('AI.jpg')
image1 = cv2.imread('image/1.jpg')
image2 = cv2.imread('image/1.jpg')
# image3 = cv2.imread('image/AI2.jpg')
# image4 = cv2.imread('image/AI3.jpg')
# image5 = cv2.imread('image/AI4.jpg')
# image6 = cv2.imread('a.jpg')
# image7 = cv2.imread('AI.jpg')
hash1 = dHash(image1) hash1 = dHash(image1)
hash2 = dHash(image2) hash2 = dHash(image2)
hash3 = dHash(image3)
hash4 = dHash(image4)
hash5 = dHash(image5)
hash6 = dHash(image6)
hash7 = dHash(image7)
# hash3 = dHash(image3)
# hash4 = dHash(image4)
# hash5 = dHash(image5)
# hash6 = dHash(image6)
# hash7 = dHash(image7)
start = time.time() start = time.time()
dist = Hamming_distance(hash6, hash7)
dist = Hamming_distance(hash1, hash2)
#将距离转化为相似度 #将距离转化为相似度
similarity = 1 - dist * 1.0 / 64 similarity = 1 - dist * 1.0 / 64
print(dist) print(dist)

+ 115
- 11
util/Cv2Utils.py Vedi File

# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import cv2 import cv2
import subprocess as sp import subprocess as sp

import ffmpeg
import numpy as np import numpy as np
from loguru import logger from loguru import logger
from exception.CustomerException import ServiceException from exception.CustomerException import ServiceException
self.fps = None self.fps = None
self.width = None self.width = None
self.height = None self.height = None
self.all_frames = None
self.bit_rate = None
self.pull_p = None
self.requestId = requestId self.requestId = requestId


def stop_cv2(self):
'''
获取视频信息
'''
def get_video_info(self):
try:
if self.pullUrl is None:
logger.error("拉流地址不能为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
probe = ffmpeg.probe(self.pullUrl)
# 视频大小
# format = probe['format']
# size = int(format['size'])/1024/1024
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
if video_stream is None:
logger.error("根据拉流地址未获取到视频流, requestId:{}", self.requestId)
return
width = video_stream.get('width')
height = video_stream.get('height')
nb_frames = video_stream.get('nb_frames')
fps = video_stream.get('r_frame_rate')
# duration = video_stream.get('duration')
bit_rate = video_stream.get('bit_rate')
if width:
self.width = int(width)
if height:
self.height = int(height)
if nb_frames:
self.all_frames = int(nb_frames)
if fps:
up, down = str(fps).split('/')
self.fps = eval(up) / eval(down)
# if duration:
# self.duration = float(video_stream['duration'])
if bit_rate:
self.bit_rate = int(bit_rate)/1000
logger.info("视频信息, width:{}|height:{}|fps:{}|all_frames:{}|bit_rate:{}, requestId:{}", self.width,
self.height, self.fps, self.all_frames, self.bit_rate, self.requestId)
except ServiceException as s:
logger.exception("获取视频信息异常: {}, requestId:{}", s.msg, self.requestId)
raise s
except Exception as e:
logger.exception("获取视频信息异常:{}, requestId:{}", e, self.requestId)

'''
拉取视频
'''
def build_pull_p(self):
try:
if self.pull_p:
logger.info("重试, 关闭拉流管道, requestId:{}", self.requestId)
self.pull_p.stdout.close()
self.pull_p.terminate()
self.pull_p.wait()
if self.pullUrl is None:
logger.error("拉流地址不能为空, requestId:{}", self.requestId)
raise ServiceException(ExceptionType.PULL_STREAM_URL_EXCEPTION.value[0],
ExceptionType.PULL_STREAM_URL_EXCEPTION.value[1])
command = ['ffmpeg',
# '-b:v', '3000k',
'-i', self.pullUrl,
'-f', 'rawvideo',
'-vcodec','rawvideo',
'-pix_fmt', 'bgr24',
# '-s', "{}x{}".format(int(width), int(height)),
'-an',
'-']
self.pull_p = sp.Popen(command, stdout=sp.PIPE)
except ServiceException as s:
logger.exception("构建拉流管道异常: {}, requestId:{}", s, self.requestId)
raise s
except Exception as e:
logger.exception("构建拉流管道异常:{}, requestId:{}", e, self.requestId)

def checkconfig(self):
if self.fps is None or self.width is None or self.height is None:
return True
return False

def read(self):
in_bytes = self.pull_p.stdout.read(self.width * self.height * 3)

# if not in_bytes:
# p.stdout.close()
# p.wait()
# break
# # 转成ndarray
# in_frame = (np.frombuffer(in_bytes, np.uint8).reshape([height, width, 3]))

def close(self):
if self.pull_p:
self.pull_p.stdin.close()
self.pull_p.terminate()
self.pull_p.wait()
logger.info("关闭拉流管道完成, requestId:{}", self.requestId)
if self.p: if self.p:
self.p.stdin.close() self.p.stdin.close()
self.p.terminate() self.p.terminate()
if self.ai_video_file: if self.ai_video_file:
self.ai_video_file.release() self.ai_video_file.release()
logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId) logger.info("关闭AI视频写入流完成, requestId:{}", self.requestId)
if self.cap:
self.cap.release()
logger.info("关闭cv2 cap完成, requestId:{}", self.requestId)


# 构建 cv2 # 构建 cv2
def build_cv2(self): def build_cv2(self):
if self.height is None or self.height == 0: if self.height is None or self.height == 0:
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
command = ['/usr/bin/ffmpeg', command = ['/usr/bin/ffmpeg',
'-y', # 不经过确认,输出时直接覆盖同名文件。
# '-y', # 不经过确认,输出时直接覆盖同名文件。
'-f', 'rawvideo', '-f', 'rawvideo',
'-vcodec', 'rawvideo', '-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-pix_fmt', 'bgr24', # 显示可用的像素格式
# '-s', "{}x{}".format(self.width * 2, self.height), # '-s', "{}x{}".format(self.width * 2, self.height),
'-s', "{}x{}".format(int(self.width), int(self.height/2)), '-s', "{}x{}".format(int(self.width), int(self.height/2)),
'-r', str(15),
# '-r', str(15),
'-i', '-', # 指定输入文件 '-i', '-', # 指定输入文件
'-g', '15',
'-g', '25',
'-b:v', '3000k', '-b:v', '3000k',
'-tune', 'zerolatency', # 加速编码速度
'-c:v', 'libx264', # 指定视频编码器 '-c:v', 'libx264', # 指定视频编码器
'-sc_threshold', '0',
'-pix_fmt', 'yuv420p', '-pix_fmt', 'yuv420p',
'-an', '-an',
'-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# 构建 cv2 # 构建 cv2
def build_p(self): def build_p(self):
try: try:
if self.p is not None:
if self.p:
logger.info("重试, 关闭管道, requestId:{}", self.requestId) logger.info("重试, 关闭管道, requestId:{}", self.requestId)
self.p.stdin.close() self.p.stdin.close()
self.p.terminate() self.p.terminate()
'-pix_fmt', 'bgr24', '-pix_fmt', 'bgr24',
# '-s', "{}x{}".format(self.width * 2, self.height), # '-s', "{}x{}".format(self.width * 2, self.height),
'-s', "{}x{}".format(int(self.width), int(self.height/2)), '-s', "{}x{}".format(int(self.width), int(self.height/2)),
'-r', str(15),
# '-r', str(15),
'-i', '-', # 指定输入文件 '-i', '-', # 指定输入文件
'-g', '15',
'-g', '5',
'-b:v', '3000k', '-b:v', '3000k',
'-bufsize', '3000k',
'-tune', 'zerolatency', # 加速编码速度
'-c:v', 'libx264', # 指定视频编码器 '-c:v', 'libx264', # 指定视频编码器
# '-sc_threshold', '0',
'-pix_fmt', 'yuv420p', '-pix_fmt', 'yuv420p',
"-an", "-an",
'-flvflags', 'no_duration_filesize',
'-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, '-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast,
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 # superfast, veryfast, faster, fast, medium, slow, slower, veryslow。
'-f', 'flv', '-f', 'flv',


def build_write(self, is_online=True): def build_write(self, is_online=True):
try: try:
if self.fps is None or self.width is None or self.height is None:
return
if is_online: if is_online:
self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), self.fps,
(self.width, self.height)) (self.width, self.height))

+ 6
- 6
util/KafkaUtils.py Vedi File

logger.error("消费者订阅topic不能为空!") logger.error("消费者订阅topic不能为空!")
raise Exception("消费者订阅topic不能为空!") raise Exception("消费者订阅topic不能为空!")
# 手动配置分区 # 手动配置分区
# customer_partition = []
# for topic in topics:
# for p in self.content["kafka"]["topic"][topic]["partition"]:
# customer_partition.append(TopicPartition(topic, p))
# self.customerConsumer.assign(customer_partition)
customer_partition = []
for topic in topics:
for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
customer_partition.append(TopicPartition(topic, p))
self.customerConsumer.assign(customer_partition)
# 自动配置 # 自动配置
self.customerConsumer.subscribe(topics=topics)
# self.customerConsumer.subscribe(topics=topics)
logger.info("kafka生产者订阅topic完成") logger.info("kafka生产者订阅topic完成")


def commit_offset(self, message): def commit_offset(self, message):

+ 56
- 10
util/ModelUtils.py Vedi File

from segutils.segmodel import SegModel, get_largest_contours from segutils.segmodel import SegModel, get_largest_contours
from utils.torch_utils import select_device from utils.torch_utils import select_device
from models.experimental import attempt_load from models.experimental import attempt_load
from utilsK.queRiver import get_labelnames, get_label_arrays, post_process_
from AI import AI_process, AI_process_forest, get_postProcess_para from AI import AI_process, AI_process_forest, get_postProcess_para




##预先设置的参数 ##预先设置的参数
self.device_ = device ##选定模型,可选 cpu,'0','1' self.device_ = device ##选定模型,可选 cpu,'0','1'
self.allowedList = allowedList self.allowedList = allowedList
postFile = '../AIlib/conf/para.json'
self.conf_thres, self.iou_thres, self.classes, self.rainbows = get_postProcess_para(postFile)




# 水面模型 # 水面模型
class SZModel(Model): class SZModel(Model):
def __init__(self, device, allowedList=None): def __init__(self, device, allowedList=None):
super().__init__(device, allowedList) super().__init__(device, allowedList)
self.label_arraylist = None
self.digitFont = None
labelnames = "../AIlib/weights/yolov5/class8/labelnames.json" ##对应类别表
self.names = get_labelnames(labelnames)



self.device = select_device(self.device_) self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' self.half = self.device.type != 'cpu'
device=self.device) device=self.device)


# names, label_arraylist, rainbows, conf_thres, iou_thres # names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process([frame], self.model, self.segmodel, config[0], config[1],
config[2], self.half, self.device, config[3], config[4],
self.allowedList)
def process(self, frame, width=1920):
if self.label_arraylist is None:
fontsize = int(width/1920*40)
line_thickness = 1
if width > 1280:
line_thickness = int(round(width/1920*3))
waterLineWidth = int(round(width/1920*3) + 1)
numFontSize = float(format(width/1920*1.1, '.1f'))
self.digitFont = {'line_thickness': line_thickness, 'fontSize': numFontSize, 'waterLineColor':(0,255,255), 'waterLineWidth': waterLineWidth}
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=fontsize,
fontpath="../AIlib/conf/platech.ttf")
return AI_process([frame], self.model, self.segmodel, self.names, self.label_arraylist,
self.rainbows, self.half, self.device, self.conf_thres, self.iou_thres,
self.allowedList, font=self.digitFont)




# 森林模型 # 森林模型
class LCModel(Model): class LCModel(Model):
def __init__(self, device, allowedList=None): def __init__(self, device, allowedList=None):
super().__init__(device, allowedList) super().__init__(device, allowedList)
self.label_arraylist = None
labelnames = "../AIlib/weights/forest/labelnames.json"
self.names = get_labelnames(labelnames)

self.device = select_device(self.device_) self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/forest/best.pt", map_location=self.device) # load FP32 model self.model = attempt_load("../AIlib/weights/forest/best.pt", map_location=self.device) # load FP32 model
self.segmodel = None self.segmodel = None


# names, label_arraylist, rainbows, conf_thres, iou_thres # names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)
def process(self, frame, width=1920):
if self.label_arraylist is None:
fontsize = int(width/1920*40)
line_thickness = 1
if width > 1280:
line_thickness = int(round(width/1920*3))
waterLineWidth = int(round(width/1920*3) + 1)
numFontSize = float(format(width/1920*1.1, '.1f'))
self.digitFont = {'line_thickness': line_thickness, 'fontSize': numFontSize, 'waterLineColor':(0,255,255), 'waterLineWidth': waterLineWidth} ###数字显示的线宽度,大小; 如果都是None,则采用默认大小
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=fontsize, fontpath="../AIlib/conf/platech.ttf")
return AI_process_forest([frame], self.model, self.segmodel, self.names, self.label_arraylist,
self.rainbows, self.half, self.device, self.conf_thres, self.iou_thres,
self.allowedList, font=self.digitFont)


# 交通模型 # 交通模型
class RFModel(Model): class RFModel(Model):
def __init__(self, device, allowedList=None): def __init__(self, device, allowedList=None):
super().__init__(device, allowedList) super().__init__(device, allowedList)
self.label_arraylist = None
labelnames = "../AIlib/weights/road/labelnames.json"
self.names = get_labelnames(labelnames)

self.device = select_device(self.device_) self.device = select_device(self.device_)
self.half = self.device.type != 'cpu' # half precision only supported on CUDA self.half = self.device.type != 'cpu' # half precision only supported on CUDA
self.model = attempt_load("../AIlib/weights/road/best.pt", map_location=self.device) # load FP32 model self.model = attempt_load("../AIlib/weights/road/best.pt", map_location=self.device) # load FP32 model
self.segmodel = None self.segmodel = None


# names, label_arraylist, rainbows, conf_thres, iou_thres # names, label_arraylist, rainbows, conf_thres, iou_thres
def process(self, frame, config):
return AI_process_forest([frame], self.model, self.segmodel, config[0], config[1], config[2],
self.half, self.device, config[3], config[4], self.allowedList)
def process(self, frame, width=1920):
if self.label_arraylist is None:
fontsize = int(width/1920*40)
line_thickness = 1
if width > 1280:
line_thickness = int(round(width/1920*3))
waterLineWidth = int(round(width/1920*3) + 1)
numFontSize = float(format(width/1920*1.1, '.1f'))
self.digitFont = {'line_thickness': line_thickness, 'fontSize': numFontSize, 'waterLineColor':(0,255,255), 'waterLineWidth': waterLineWidth} ###数字显示的线宽度,大小; 如果都是None,则采用默认大小
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=fontsize, fontpath="../AIlib/conf/platech.ttf")
return AI_process_forest([frame], self.model, self.segmodel, self.names, self.label_arraylist,
self.rainbows, self.half, self.device, self.conf_thres, self.iou_thres,
self.allowedList, font=self.digitFont)

Loading…
Annulla
Salva