@@ -0,0 +1,8 @@ | |||
# 配置文件名称 | |||
APPLICATION_CONFIG="dsp_application.yml" | |||
# 编码格式 | |||
UTF_8="utf-8" | |||
# 文件读模式 | |||
R='r' |
@@ -0,0 +1,97 @@ | |||
# -*- coding: utf-8 -*- | |||
import json | |||
from multiprocessing import Process, Queue | |||
from util import LogUtils | |||
from util import KafkaUtils | |||
from loguru import logger | |||
from concurrency.FileUpdateThread import ImageFileUpdate | |||
from concurrency.HeartbeatThread import Heartbeat | |||
import time | |||
class CommonProcess(Process): | |||
def __init__(self, fbQueue, content, msg, imageQueue, mode_service): | |||
super().__init__() | |||
self.fbQueue = fbQueue | |||
self.content = content | |||
self.msg = msg | |||
self.mode_service = mode_service | |||
self.imageQueue = imageQueue | |||
self.eventQueue = Queue() | |||
# 给本进程发送事件 | |||
def sendEvent(self, eBody): | |||
self.eventQueue.put(eBody) | |||
# 获取下一个事件 | |||
def getEvent(self): | |||
eBody = None | |||
try: | |||
eBody = self.eventQueue.get(block=False) | |||
except Exception as e: | |||
pass | |||
return eBody | |||
def getFeedback(self): | |||
eBody = None | |||
try: | |||
eBody = self.fbQueue.get(block=False) | |||
except Exception as e: | |||
pass | |||
return eBody | |||
def run(self): | |||
# 初始化日志配置 | |||
LogUtils.init_log(self.content) | |||
logger.info("心跳、图片上传,反馈进程开始执行, requestId:{}", self.msg.get("request_id")) | |||
# 启动心跳线程 | |||
hb = Heartbeat(self.fbQueue, self.msg.get("request_id"), self.mode_service) | |||
hb.start() | |||
# 图片上传线程 | |||
imageFileUpdate = ImageFileUpdate(self.fbQueue, self.content, self.msg, self.imageQueue, self.mode_service) | |||
imageFileUpdate.start() | |||
kafkaProducer = KafkaUtils.CustomerKafkaProducer(self.content) | |||
# 心跳线程检测 | |||
heartbeat_num = 0 | |||
# 图片上传线程检测 | |||
imageFileUpdate_num = 0 | |||
while True: | |||
try: | |||
fb = self.getFeedback() | |||
if fb is not None and len(fb) > 0: | |||
logger.info("发送结果反馈:{}", json.dumps(fb)) | |||
kafkaProducer.get_producer() | |||
kafkaProducer.sender(self.content["kafka"]["topic"]["dsp-alg-results-topic"], | |||
fb["request_id"], fb, 1) | |||
if heartbeat_num == 0 and not hb.is_alive(): | |||
logger.error("未检测到心跳线程活动,心跳线程可能出现异常") | |||
break | |||
if imageFileUpdate_num == 0 and not imageFileUpdate.is_alive(): | |||
logger.error("未检测到图片上传线程活动,图片上传线程可能出现异常") | |||
break | |||
eBody = self.getEvent() | |||
if eBody is not None and len(eBody) > 0: | |||
cmdStr = eBody.get("command", None) | |||
# 接收心跳线程和图片上传停止指令 | |||
if 'stop_heartbeat_imageFileUpdate' == cmdStr: | |||
heartbeat_num += 1 | |||
imageFileUpdate_num += 1 | |||
hb.run_status = False | |||
imageFileUpdate.run_status = False | |||
hb.join() | |||
imageFileUpdate.join() | |||
# 接收进程停止指令 | |||
if 'stop' == cmdStr: | |||
hb.run_status = False | |||
imageFileUpdate.run_status = False | |||
hb.join() | |||
imageFileUpdate.join() | |||
break | |||
except Exception as e: | |||
logger.error("结果反馈异常:") | |||
logger.exception(e) | |||
logger.info("心跳、图片上传,反馈进程执行完成, requestId:{}", self.msg.get("request_id")) | |||
@@ -0,0 +1,22 @@ | |||
from threading import Thread | |||
from loguru import logger | |||
from util import LogUtils | |||
class Common(Thread): | |||
def __init__(self, content, func, args=()): | |||
super(Common, self).__init__() | |||
self.content = content | |||
self.func = func | |||
self.args = args | |||
self.result = None | |||
def get_result(self): | |||
self.join() | |||
return self.result | |||
def run(self): | |||
logger.info("开始执行线程!") | |||
self.result = self.func(self.args) | |||
logger.info("线程停止完成!") |
@@ -0,0 +1,112 @@ | |||
from threading import Thread | |||
from loguru import logger | |||
import cv2 | |||
from util.AliyunSdk import AliyunOssSdk | |||
from util import TimeUtils | |||
import uuid | |||
from entity import FeedBack | |||
from enums.AnalysisTypeEnum import AnalysisType | |||
from enums.AnalysisStatusEnum import AnalysisStatus | |||
import numpy as np | |||
from PIL import Image | |||
class FileUpdate(Thread): | |||
def __init__(self, fbQueue, content, msg, imageQueue, mode_service): | |||
super().__init__() | |||
self.fbQueue = fbQueue | |||
self.content = content | |||
self.run_status = True | |||
self.imageQueue = imageQueue | |||
self.mode_service = mode_service | |||
self.msg = msg | |||
self.cfg = {} | |||
# 执行配置项 | |||
def setCfg(self, key, value): | |||
self.cfg[key] = value | |||
# # 给本线程发送事件 | |||
# def sendEvent(self, eBody): | |||
# self.eventQueue.put(eBody) | |||
# # 获取下一个事件 | |||
# def getEvent(self): | |||
# return self.eventQueue.get() | |||
# # 获取下一个事件 | |||
def getImageQueue(self): | |||
eBody = None | |||
try: | |||
eBody = self.imageQueue.get(block=False) | |||
except Exception as e: | |||
pass | |||
return eBody | |||
# 推送执行结果 | |||
def sendResult(self, result): | |||
self.fbQueue.put(result) | |||
class ImageFileUpdate(FileUpdate): | |||
def run(self): | |||
logger.info("开始启动图片上传线程") | |||
aliyunOssSdk = AliyunOssSdk(self.content) | |||
aliyunOssSdk.get_oss_bucket() | |||
while self.run_status: | |||
try: | |||
image_dict = self.getImageQueue() | |||
if image_dict is not None and len(image_dict) > 0: | |||
# 图片帧数编码 | |||
or_result, or_image = cv2.imencode(".jpg", image_dict.get("or_frame")) | |||
ai_result, ai_image = cv2.imencode(".jpg", image_dict.get("ai_frame")) | |||
# 定义上传图片名称 | |||
random_num = str(uuid.uuid1().hex) | |||
time_now = TimeUtils.now_date_to_str("%Y-%m-%d-%H-%M-%S") | |||
# 图片名称待后期修改 | |||
or_image_name = self.build_image_name(self.msg.get('results_base_dir'), time_now, | |||
str(image_dict.get("current_frame")), | |||
str(image_dict.get("last_frame")), | |||
image_dict.get("question_descrition"), | |||
random_num, | |||
image_dict.get("mode_service"), | |||
self.msg.get('request_id'), "OR") | |||
ai_image_name = self.build_image_name(self.msg.get('results_base_dir'), time_now, | |||
str(image_dict.get("current_frame")), | |||
str(image_dict.get("last_frame")), | |||
image_dict.get("question_descrition"), | |||
random_num, | |||
image_dict.get("mode_service"), | |||
self.msg.get('request_id'), "AI") | |||
# 上传原图片 | |||
aliyunOssSdk.upload_file(or_image_name, Image.fromarray(np.uint8(or_image)).tobytes()) | |||
aliyunOssSdk.upload_file(ai_image_name, Image.fromarray(np.uint8(ai_image)).tobytes()) | |||
# requestId, status, type, error_code="", error_msg="", progress="", original_url="", sign_url="", | |||
# category_id="", description="", analyse_time="" | |||
# 发送kafka消息 | |||
self.sendResult(FeedBack.message_feedback(self.msg.get('request_id'), AnalysisStatus.RUNNING.value, | |||
self.mode_service, "", "", image_dict.get("progress"), | |||
or_image_name, | |||
ai_image_name, image_dict.get("question_code"), | |||
image_dict.get("question_descrition"), | |||
TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
except Exception as e: | |||
logger.error("图片上传异常:") | |||
logger.exception(e) | |||
logger.info("结束图片上传线程") | |||
def build_image_name(self, base_dir, time_now, current_frame, last_frame, descrition, random_num, mode_type, | |||
requestId, image_type): | |||
image_format = "{base_dir}/{time_now}_frame-{current_frame}-{last_frame}_type-{descrition}_{random_num}-{mode_type}-{base_dir}-{requestId}_{image_type}.jpg" | |||
image_name = image_format.format( | |||
base_dir=base_dir, | |||
time_now=time_now, | |||
current_frame=current_frame, | |||
last_frame=last_frame, | |||
descrition=descrition, | |||
random_num=random_num, | |||
mode_type=mode_type, | |||
requestId=requestId, | |||
image_type=image_type) | |||
return image_name |
@@ -0,0 +1,35 @@ | |||
from threading import Thread | |||
import time | |||
from loguru import logger | |||
from util import TimeUtils | |||
from enums.AnalysisStatusEnum import AnalysisStatus | |||
from entity.FeedBack import message_feedback | |||
class Heartbeat(Thread): | |||
def __init__(self, fbQueue, request_id, mode_service): | |||
super().__init__() | |||
self.fbQueue = fbQueue | |||
self.request_id = request_id | |||
self.mode_service = mode_service | |||
self.run_status = True | |||
# 推送执行结果 | |||
def sendResult(self, result): | |||
self.fbQueue.put(result) | |||
def run(self): | |||
logger.info("开始启动心跳线程!") | |||
# 发送waiting状态信息 | |||
feedback = message_feedback(self.request_id, AnalysisStatus.WAITING.value, self.mode_service, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S")) | |||
self.sendResult(feedback) | |||
num = 0 | |||
while self.run_status: | |||
if num % 60 == 0: | |||
feedback = message_feedback(self.request_id, AnalysisStatus.RUNNING.value, self.mode_service, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S")) | |||
self.sendResult(feedback) | |||
num += 1 | |||
time.sleep(1) | |||
logger.info("心跳线程停止完成!") |
@@ -0,0 +1,405 @@ | |||
# -*- coding: utf-8 -*- | |||
import os | |||
import time | |||
import uuid | |||
from multiprocessing import Process, Queue | |||
from loguru import logger | |||
from enums.AnalysisStatusEnum import AnalysisStatus | |||
from enums.AnalysisTypeEnum import AnalysisType | |||
from enums.ExceptionEnum import ExceptionType | |||
from enums.AnalysisLabelEnum import AnalysisLabel | |||
from util import LogUtils, TimeUtils | |||
from util.Cv2Utils import Cv2Util | |||
from util import ModelUtils | |||
from entity.FeedBack import message_feedback | |||
from util.AliyunSdk import ThAliyunVodSdk | |||
from concurrency.CommonThread import Common | |||
from concurrency.CommonProcess import CommonProcess | |||
from exception.CustomerException import ServiceException | |||
class IntelligentRecognitionProcess(Process): | |||
def __init__(self, fbQueue, content, msg, imageQueue): | |||
super().__init__() | |||
self.fbQueue = fbQueue | |||
self.eventQueue = Queue() | |||
self.content = content | |||
self.msg = msg | |||
self.imageQueue = imageQueue | |||
self.cfg = {} | |||
# 执行配置项 | |||
def setCfg(self, key, value): | |||
self.cfg[key] = value | |||
# 给本进程发送事件 | |||
def sendEvent(self, eBody): | |||
self.eventQueue.put(eBody) | |||
# 获取下一个事件 | |||
def getEvent(self): | |||
try: | |||
eBody = self.eventQueue.get(block=False) | |||
return eBody | |||
except Exception as e: | |||
pass | |||
# 推送执行结果 | |||
def sendResult(self, result): | |||
self.fbQueue.put(result) | |||
def get_needed_objectsIndex(self, object_config): | |||
needed_objectsIndex=[] | |||
for model in object_config: | |||
try: | |||
needed_objectsIndex.append(int(model['id'])) | |||
except Exception as e: | |||
a=1 | |||
allowedList_str=[str(x) for x in needed_objectsIndex] | |||
allowedList_string=','.join(allowedList_str) | |||
return needed_objectsIndex, allowedList_string | |||
class OnlineIntelligentRecognitionProcess(IntelligentRecognitionProcess): | |||
def upload_local_video(args): | |||
logger.info("args[0]:{} |args[1]:{}| ", args[0], args[1]) | |||
return ThAliyunVodSdk(args[2]).upload_local_video(args[0], args[1]) | |||
def stop_task(self, cv2tool, orFilePath, aiFilePath, commonProcess, snalysisStatus): | |||
if cv2tool.p: | |||
logger.info("{}, 关闭管道", self.msg.get("request_id")) | |||
cv2tool.p.kill() | |||
logger.info("{}, 关闭管道完成", self.msg.get("request_id")) | |||
if cv2tool.or_video_file: | |||
logger.info("{}, 关闭原视频写入流", self.msg.get("request_id")) | |||
cv2tool.or_video_file.release() | |||
logger.info("{}, 关闭原视频写入流完成", self.msg.get("request_id")) | |||
if cv2tool.ai_video_file: | |||
logger.info("{}, 关闭AI视频写入流", self.msg.get("request_id")) | |||
cv2tool.ai_video_file.release() | |||
logger.info("{}, 关闭AI视频写入流完成", self.msg.get("request_id")) | |||
if cv2tool.cap: | |||
logger.info("{}, 关闭cv2 cap", self.msg.get("request_id")) | |||
cv2tool.cap.release() | |||
logger.info("{}, 关闭cv2 cap完成", self.msg.get("request_id")) | |||
if not os.path.exists(orFilePath) or not os.path.exists(aiFilePath): | |||
logger.error("原视频或AI视频不存在!") | |||
raise Exception("原视频或AI视频不存在!") | |||
params1 = (orFilePath, "orOnLineVideo", self.content) | |||
or_update_thread = Common(content= self.content, | |||
func=OnlineIntelligentRecognitionProcess.upload_local_video, | |||
args=params1) | |||
params2 = (aiFilePath, "aiOnLineVideo", self.content) | |||
ai_update_thread = Common(content= self.content, | |||
func=OnlineIntelligentRecognitionProcess.upload_local_video, | |||
args=params2) | |||
or_update_thread.start() | |||
ai_update_thread.start() | |||
or_play_url = or_update_thread.get_result() | |||
ai_play_url = ai_update_thread.get_result() | |||
if or_play_url is None or ai_play_url is None: | |||
logger.error("原视频或AI视频播放上传VOD失败!原视频播放地址:{}, AI播放地址: {}", or_play_url, ai_play_url) | |||
raise Exception("原视频或AI视频上传VOD失败!") | |||
# (requestId, status, type, error_code="", error_msg="", progress="", original_url="", sign_url="", | |||
# category_id="", description="", analyse_time="") | |||
commonProcess.sendEvent({"command": "stop_heartbeat_imageFileUpdate"}) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), snalysisStatus, | |||
AnalysisType.ONLINE.value, | |||
progress="1.0000", | |||
original_url=or_play_url, | |||
sign_url=ai_play_url, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
def run(self): | |||
# 程序开始时间 | |||
start_time = time.time() | |||
# 启动公共进程包含(图片上传线程,心跳线程,问题反馈线程) | |||
commonProcess = CommonProcess(self.fbQueue, self.content, self.msg, self.imageQueue, AnalysisType.ONLINE.value) | |||
commonProcess.start() | |||
orFilePath = None | |||
aiFilePath = None | |||
try: | |||
LogUtils.init_log(self.content) | |||
# 加载模型 | |||
logger.info("开始加载算法模型") | |||
needed_objectsIndex, allowedList_string = self.get_needed_objectsIndex(self.msg["models"]) | |||
mod = ModelUtils.SZModel(needed_objectsIndex) # 后期修改为对应的 | |||
# mod = ModelUtils.SZModel([0,1,2,3]) | |||
logger.info("加载算法模型完成") | |||
# 定义原视频、AI视频保存名称 | |||
randomStr = str(uuid.uuid1().hex) | |||
orFilePath = os.path.join(self.content["video"]["file_path"], randomStr + "_or_" + self.msg.get("request_id") + ".mp4") | |||
aiFilePath = os.path.join(self.content["video"]["file_path"], randomStr + "_ai_" + self.msg.get("request_id") + ".mp4") | |||
logger.info("开始初始化cv2") | |||
cv2tool = Cv2Util(self.msg.get('pull_url'), self.msg.get('push_url'), orFilePath, aiFilePath) | |||
cv2tool.build_cv2_config() | |||
logger.info("初始化cv2完成") | |||
# cv2重试初始化次数 | |||
cv2_init_num= 1 | |||
# 当时视频帧数 | |||
current_frame = 1 | |||
# 上一次记录视频帧数 | |||
last_frame = 0 | |||
while True: | |||
end_time = time.time() | |||
# 默认6个小时 | |||
if end_time - start_time > int(self.content["service"]["timeout"]): | |||
raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0], | |||
ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1],) | |||
if not commonProcess.is_alive(): | |||
logger.info("图片上传、心跳、问题反馈进程异常停止") | |||
raise Exception("图片上传、心跳、问题反馈进程异常停止") | |||
eBody = self.getEvent() | |||
if eBody is not None and len(eBody) > 0: | |||
logger.info("eBody:{} 任务停止分析", eBody) | |||
cmdStr = eBody.get("command", None) | |||
logger.info("cmdStr:{} 任务停止分析", cmdStr) | |||
# 接收到停止指令 | |||
if 'stop' == cmdStr: | |||
logger.info("{} 任务停止分析", self.msg.get("request_id")) | |||
self.stop_task(cv2tool, orFilePath, aiFilePath, commonProcess, AnalysisStatus.SUCCESS.value) | |||
break | |||
# 检测是否断流 | |||
if cv2tool.cap is None or not cv2tool.cap.isOpened(): | |||
# 默认1个小时 | |||
cv2_startPullStream_timeout = time.time() | |||
if cv2_startPullStream_timeout - start_time > int(self.content["service"]["cv2_pull_stream_timeout"]): | |||
raise ServiceException(ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[0], | |||
ExceptionType.PULLSTREAM_TIMEOUT_EXCEPTION.value[1],) | |||
logger.info("cv2初始化重试:{}次", cv2_init_num) | |||
cv2_init_num += 1 | |||
time.sleep(5) | |||
cv2tool.build_cv2_config() | |||
continue | |||
is_opened, frame = cv2tool.cap.read() | |||
if is_opened is None or not is_opened: | |||
# 默认1个小时 | |||
cv2_readStream_timeout = time.time() | |||
if cv2_readStream_timeout - start_time > int(self.content["service"]["cv2_read_stream_timeout"]): | |||
logger.info("{}: 运行中拉流超时停止分析, 超时时间:{}", self.msg.get("request_id"), int(cv2_readStream_timeout - start_time)) | |||
self.stop_task(cv2tool, orFilePath, aiFilePath, commonProcess, AnalysisStatus.TIMEOUT.value) | |||
break | |||
time.sleep(5) | |||
logger.info("cv2初始化重试:{}次", cv2_init_num) | |||
cv2_init_num += 1 | |||
cv2tool.build_cv2_config() | |||
continue | |||
# time00 = time.time() | |||
# 调用AI模型 | |||
p_result, timeOut = mod.SZ_process(frame) | |||
# time11 = time.time() | |||
# logger.info("算法模型调度时间:{}s", int(time11-time00)) | |||
# AI推流 | |||
cv2tool.p.stdin.write(p_result[1].tostring()) | |||
# 原视频保存本地、AI视频保存本地 | |||
cv2tool.or_video_file.write(frame) | |||
cv2tool.ai_video_file.write(p_result[1]) | |||
# # 问题图片加入队列, 暂时写死,后期修改为真实问题 | |||
if p_result[2] is not None and len(p_result[2]) > 0: | |||
ai_analyse_result = p_result[2][0] | |||
conf_c = round(ai_analyse_result[5], 2) | |||
if current_frame > last_frame and conf_c > float(self.content["service"]["score_threshold"]): | |||
last_frame = current_frame + int(self.content["service"]["frame_step"]) | |||
# 2 [[2.0, 0.3059895932674408, 0.8865740895271301, 0.0338541679084301, 0.21759259700775146, 0.5478515625]] | |||
# 其中每一个元素表示一个目标构成如:[float(cls_c), xc,yc,w,h, float(conf_c)] | |||
# #cls_c--类别,如0,1,2,3; xc,yc,w,h--中心点坐标及宽;conf_c--得分, 取值范围在0-1之间 | |||
label = AnalysisLabel.getAnalysisLabel(str(int(ai_analyse_result[0]))) | |||
image_dict = { | |||
"or_frame": frame, | |||
"ai_frame": p_result[1], | |||
"current_frame": current_frame, | |||
"last_frame": last_frame, | |||
"progress": "", | |||
"mode_service": "online", | |||
"question_code": label.value[2], | |||
"question_descrition": label.value[1] | |||
} | |||
self.imageQueue.put(image_dict) | |||
current_frame += 1 | |||
logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id")) | |||
except ServiceException as s: | |||
commonProcess.sendEvent({"command": "stop_heartbeat_imageFileUpdate"}) | |||
logger.error("服务异常,异常编号:{}, 异常描述:{}", s.code, s.msg) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value, | |||
AnalysisType.ONLINE.value, | |||
s.code, | |||
s.msg, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
except Exception as e: | |||
commonProcess.sendEvent({"command": "stop_heartbeat_imageFileUpdate"}) | |||
logger.error("拉流异常:") | |||
logger.exception(e) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value, | |||
AnalysisType.ONLINE.value, | |||
ExceptionType.SERVICE_INNER_EXCEPTION.value[0], | |||
ExceptionType.SERVICE_INNER_EXCEPTION.value[1], | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
finally: | |||
time.sleep(3) | |||
commonProcess.sendEvent({"command": "stop"}) | |||
commonProcess.join() | |||
# 删除本地视频文件 | |||
if orFilePath is not None and os.path.exists(orFilePath): | |||
logger.error("开始删除原视频, orFilePath: {}", orFilePath) | |||
os.remove(orFilePath) | |||
logger.error("删除原视频成功, orFilePath: {}", orFilePath) | |||
if aiFilePath is not None and os.path.exists(aiFilePath): | |||
logger.error("开始删除AI视频, aiFilePath: {}", aiFilePath) | |||
os.remove(aiFilePath) | |||
logger.error("删除AI视频成功, aiFilePath: {}", aiFilePath) | |||
class OfflineIntelligentRecognitionProcess(IntelligentRecognitionProcess): | |||
def upload_local_video(args): | |||
logger.info("args[0]:{} |args[1]:{}| ", args[0], args[1]) | |||
return ThAliyunVodSdk(args[2]).upload_local_video(args[0], args[1]) | |||
def stop_task(self, cv2tool, aiFilePath, commonProcess, analysisStatus): | |||
if cv2tool.ai_video_file: | |||
logger.info("{}, 关闭AI视频写入流", self.msg.get("request_id")) | |||
cv2tool.ai_video_file.release() | |||
logger.info("{}, 关闭AI视频写入流完成", self.msg.get("request_id")) | |||
if cv2tool.cap: | |||
logger.info("{}, 关闭cv2 cap", self.msg.get("request_id")) | |||
cv2tool.cap.release() | |||
logger.info("{}, 关闭cv2 cap完成", self.msg.get("request_id")) | |||
if not os.path.exists(aiFilePath): | |||
logger.error("AI视频不存在!") | |||
raise Exception("AI视频不存在!") | |||
params2 = (aiFilePath, "aiOffLineVideo", self.content) | |||
ai_update_thread = Common(content= self.content, | |||
func=OfflineIntelligentRecognitionProcess.upload_local_video, | |||
args=params2) | |||
ai_update_thread.start() | |||
ai_play_url = ai_update_thread.get_result() | |||
if ai_play_url is None: | |||
logger.error("原视频或AI视频播放上传VOD失败!requestId: {}, AI播放地址: {}", | |||
self.msg.get("request_id"), ai_play_url) | |||
raise Exception("AI视频上传VOD失败!") | |||
# (requestId, status, type, error_code="", error_msg="", progress="", original_url="", sign_url="", | |||
# category_id="", description="", analyse_time="") | |||
commonProcess.sendEvent({"command": "stop_heartbeat_imageFileUpdate"}) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), analysisStatus, | |||
AnalysisType.OFFLINE.value, | |||
progress="1.0000", | |||
sign_url=ai_play_url, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
def run(self): | |||
# 程序开始时间 | |||
start_time = time.time() | |||
# 结果反馈进程启动 | |||
commonProcess = CommonProcess(self.fbQueue, self.content, self.msg, self.imageQueue, AnalysisType.OFFLINE.value) | |||
commonProcess.start() | |||
aiFilePath = None | |||
try: | |||
LogUtils.init_log(self.content) | |||
# 加载模型 | |||
logger.info("开始加载算法模型") | |||
needed_objectsIndex, allowedList_string = self.get_needed_objectsIndex(self.msg["models"]) | |||
mod = ModelUtils.SZModel(needed_objectsIndex) # 后期修改为对应的 | |||
# mod = ModelUtils.SZModel([0,1,2,3]) | |||
logger.info("加载算法模型完成") | |||
# 定义原视频、AI视频保存名称 | |||
randomStr = str(uuid.uuid1().hex) | |||
aiFilePath = os.path.join(self.content["video"]["file_path"], randomStr + "_ai_" + self.msg.get("request_id") + ".mp4") | |||
logger.info("开始初始化cv2") | |||
if self.content["aliyun"]["vod"]["host_address"] in self.msg.get('original_url'): | |||
original_url = self.msg.get('original_url') | |||
else: | |||
original_url = "{}{}".format(self.content["aliyun"]["vod"]["host_address"], self.msg.get('original_url')) | |||
cv2tool = Cv2Util(original_url, aiFilePath=aiFilePath) | |||
cv2tool.build_cv2_config(False) | |||
logger.info("初始化cv2完成") | |||
# cv2重试初始化次数 | |||
cv2_init_num= 1 | |||
# 当时视频帧数 | |||
current_frame = 1 | |||
# 上一次记录视频帧数 | |||
last_frame = 0 | |||
while True: | |||
end_time = time.time() | |||
# 默认6个小时 | |||
if end_time - start_time > int(self.content["service"]["timeout"]): | |||
raise ServiceException(ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[0], | |||
ExceptionType.ANALYSE_TIMEOUT_EXCEPTION.value[1],) | |||
if not commonProcess.is_alive(): | |||
logger.info("图片上传、心跳、问题反馈进程异常停止") | |||
raise Exception("图片上传、心跳、问题反馈进程异常停止") | |||
# 检测是否断流 | |||
if cv2tool.cap is None or not cv2tool.cap.isOpened(): | |||
logger.info("cv2初始化重试:{}次", cv2_init_num) | |||
if cv2_init_num >= 3: | |||
raise ServiceException(ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[0], | |||
ExceptionType.OR_VIDEO_ADDRESS_EXCEPTION.value[1]) | |||
cv2_init_num += 1 | |||
time.sleep(5) | |||
cv2tool.build_cv2_config(False) | |||
continue | |||
is_opened, frame = cv2tool.cap.read() | |||
if is_opened is None or not is_opened: | |||
logger.info("{} 任务停止分析", self.msg.get("request_id")) | |||
self.stop_task(cv2tool, aiFilePath, commonProcess, AnalysisStatus.SUCCESS.value) | |||
break | |||
# time00 = time.time() | |||
# 调用AI模型 | |||
p_result, timeOut = mod.SZ_process(frame) | |||
# time11 = time.time() | |||
# logger.info("算法模型调度时间:{}s", int(time11-time00)) | |||
# 原视频保存本地、AI视频保存本地 | |||
cv2tool.ai_video_file.write(p_result[1]) | |||
# # 问题图片加入队列, 暂时写死,后期修改为真实问题 | |||
if p_result[2] is not None and len(p_result[2]) > 0: | |||
ai_analyse_result = p_result[2][0] | |||
conf_c = round(ai_analyse_result[5], 2) | |||
if current_frame > last_frame and conf_c > float(self.content["service"]["score_threshold"]): | |||
last_frame = current_frame + int(self.content["service"]["frame_step"]) | |||
# 2 [[2.0, 0.3059895932674408, 0.8865740895271301, 0.0338541679084301, 0.21759259700775146, 0.5478515625]] | |||
# 其中每一个元素表示一个目标构成如:[float(cls_c), xc,yc,w,h, float(conf_c)] | |||
# #cls_c--类别,如0,1,2,3; xc,yc,w,h--中心点坐标及宽;conf_c--得分, 取值范围在0-1之间 | |||
label = AnalysisLabel.getAnalysisLabel(str(int(ai_analyse_result[0]))) | |||
image_dict = { | |||
"or_frame": frame, | |||
"ai_frame": p_result[1], | |||
"current_frame": current_frame, | |||
"last_frame": last_frame, | |||
"progress": format(float(cv2tool.cap.get(1))/float(cv2tool.cap.get(7)), '.4f'), | |||
"mode_service": "offline", | |||
"question_code": label.value[2], | |||
"question_descrition": label.value[1] | |||
} | |||
self.imageQueue.put(image_dict) | |||
current_frame += 1 | |||
logger.info("实时进程任务完成,requestId:{}", self.msg.get("request_id")) | |||
except ServiceException as s: | |||
commonProcess.sendEvent({"command": "stop_heartbeat_imageFileUpdate"}) | |||
logger.error("服务异常,异常编号:{}, 异常描述:{}", s.code, s.msg) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value, | |||
AnalysisType.OFFLINE.value, | |||
s.code, | |||
s.msg, | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
except Exception as e: | |||
logger.error("服务异常:") | |||
logger.exception(e) | |||
self.sendResult(message_feedback(self.msg.get("request_id"), AnalysisStatus.FAILED.value, | |||
AnalysisType.OFFLINE.value, | |||
ExceptionType.SERVICE_INNER_EXCEPTION.value[0], | |||
ExceptionType.SERVICE_INNER_EXCEPTION.value[1], | |||
analyse_time=TimeUtils.now_date_to_str("%Y-%m-%d %H:%M:%S"))) | |||
finally: | |||
commonProcess.sendEvent({"command": "stop"}) | |||
commonProcess.join() | |||
# 删除本地视频文件 | |||
if aiFilePath is not None and os.path.exists(aiFilePath): | |||
logger.info("开始删除AI视频, aiFilePath: {}", aiFilePath) | |||
os.remove(aiFilePath) | |||
logger.info("删除AI视频成功, aiFilePath: {}", aiFilePath) | |||
class PhotosIntelligentRecognitionProcess(IntelligentRecognitionProcess): | |||
pass |
@@ -0,0 +1,81 @@ | |||
# -*- coding: utf-8 -*- | |||
import time | |||
from loguru import logger | |||
from queue import Queue | |||
from threading import Thread | |||
from util import KafkaUtils | |||
''' | |||
实时、离线消息拉取线程 | |||
''' | |||
class MessagePollingThread(Thread): | |||
# 实时流分析消息拉取线程 | |||
def __init__(self, name, cfg): | |||
super().__init__() | |||
self.name = name | |||
self.cfg = cfg | |||
self.msgQueue = Queue() | |||
def getMsgQueue(self): | |||
eBody = None | |||
try: | |||
eBody = self.msgQueue.get(block=False) | |||
except Exception as e: | |||
pass | |||
return eBody | |||
def run(self): | |||
logger.info("{} 线程任务开始执行", self.name) | |||
# 指令消息消费 | |||
customerKafkaConsumer = KafkaUtils.CustomerKafkaConsumer(self.cfg["content"]) | |||
customerKafkaConsumer.subscribe(topics=self.cfg["topics"]) | |||
while True: | |||
try: | |||
if self.msgQueue.qsize() > 0: | |||
time.sleep(2) | |||
continue | |||
if customerKafkaConsumer.customerConsumer is None: | |||
customerKafkaConsumer.get_consumer() | |||
customerKafkaConsumer.subscribe(topics=self.cfg["topics"]) | |||
# 拉取消息问题 1:后面运行会吃力,建议每次一条一拉 | |||
msg = customerKafkaConsumer.customerConsumer.poll() | |||
if msg is not None and len(msg) > 0: | |||
self.msgQueue.put(msg) | |||
for k, v in msg.items(): | |||
for m in v: | |||
customerKafkaConsumer.commit_offset(m) | |||
except Exception as e: | |||
logger.error("消息监听异常:") | |||
logger.exception(e) | |||
time.sleep(1) | |||
def poll(self): | |||
if self.msgQueue.qsize() > 0: | |||
return self.getMsgQueue() | |||
return None | |||
class OnlineMessagePollingThread(MessagePollingThread): | |||
# 实时流分析消息拉取线程 | |||
pass | |||
class OfflineMessagePollingThread(MessagePollingThread): | |||
# 实时流分析消息拉取线程 | |||
pass | |||
# if __name__ == '__main__': | |||
# t1 = OnlineMessagePollingThread('t1', {'bootstrap_servers': [ | |||
# 'localhost:9092'], 'group_id': 'algSch', 'topics': ('alg_online_tasks',)}) | |||
# t1.start() | |||
# | |||
# t2 = OfflineMessagePollingThread('t2', {'bootstrap_servers': [ | |||
# 'localhost:9092'], 'group_id': 'algSch', 'topics': ('alg_offline_tasks',)}) | |||
# t2.start() | |||
# | |||
# while True: | |||
# print(t1.poll()) | |||
# print(t2.poll()) | |||
# time.sleep(1) |
@@ -0,0 +1,111 @@ | |||
kafka: | |||
topic: | |||
dsp-alg-online-tasks-topic: dsp-alg-online-tasks | |||
dsp-alg-online-tasks: | |||
partition: [0] | |||
dsp-alg-offline-tasks-topic: dsp-alg-offline-tasks | |||
dsp-alg-offline-tasks: | |||
partition: [0] | |||
dsp-alg-results-topic: dsp-alg-task-results | |||
dsp-alg-task-results: | |||
partition: [0] | |||
dsp-alg-command-topic: dsp-alg-command | |||
dsp-alg-command: | |||
partition: [0] | |||
active: dev | |||
local: | |||
bootstrap_servers: ['192.168.10.66:9092'] | |||
producer: | |||
acks: -1 | |||
retries: 3 | |||
linger_ms: 50 | |||
retry_backoff_ms: 1000 | |||
max_in_flight_requests_per_connection: 5 | |||
consumer: | |||
client_id: AI_server | |||
group_id: dsp-test | |||
auto_offset_reset: latest | |||
enable_auto_commit: False | |||
max_poll_records: 1 | |||
dev: | |||
bootstrap_servers: ['192.168.11.13:9092'] | |||
producer: | |||
acks: -1 | |||
retries: 3 | |||
linger_ms: 50 | |||
retry_backoff_ms: 1000 | |||
max_in_flight_requests_per_connection: 5 | |||
consumer: | |||
client_id: AI_server | |||
group_id: dsp-test | |||
auto_offset_reset: latest | |||
enable_auto_commit: False | |||
max_poll_records: 1 | |||
test: | |||
bootstrap_servers: ['192.168.11.242:9092'] | |||
producer: | |||
acks: -1 | |||
retries: 3 | |||
linger_ms: 50 | |||
retry_backoff_ms: 1000 | |||
max_in_flight_requests_per_connection: 5 | |||
consumer: | |||
client_id: AI_server | |||
group_id: dsp-test | |||
auto_offset_reset: latest | |||
enable_auto_commit: False | |||
max_poll_records: 1 | |||
prod: | |||
bootstrap_servers: ['101.132.127.1:19092'] | |||
producer: | |||
acks: -1 | |||
retries: 3 | |||
linger_ms: 50 | |||
retry_backoff_ms: 1000 | |||
max_in_flight_requests_per_connection: 5 | |||
consumer: | |||
client_id: AI_server | |||
group_id: dsp-test | |||
auto_offset_reset: latest | |||
enable_auto_commit: False | |||
max_poll_records: 1 | |||
video: | |||
file_path: /home/DATA/dsp/ai/video/ | |||
aliyun: | |||
access_key: LTAI5tSJ62TLMUb4SZuf285A | |||
access_secret: MWYynm30filZ7x0HqSHlU3pdLVNeI7 | |||
oss: | |||
endpoint: http://oss-cn-shanghai.aliyuncs.com | |||
bucket: 'ta-tech-image' | |||
connect_timeout: 30 | |||
vod: | |||
host_address: https://vod.play.t-aaron.com/ | |||
ecsRegionId: "cn-shanghai" | |||
service: | |||
score_threshold: 0.80 # 扫描多少得分阀值以上的结果 | |||
frame_step: 100 # 多少帧数步长之间获取一次分析图片 | |||
timeout: 21600 # 一次识别任务超时时间,单位秒,默认6个小时 | |||
cv2_pull_stream_timeout: 3600 # 直播开始视频未推流超时时间 | |||
cv2_read_stream_timeout: 1800 # 直播读流中超时时间 | |||
# 日志设置 | |||
log: | |||
# 是否开启文件输出 True:开启 False:关闭 | |||
enable_file_log: False | |||
# 是否开启控制台日志输出 True:开启 False:关闭 | |||
enable_stderr: True | |||
# 日志打印文件夹 | |||
base_path: /home/DATA/dsp/python/logs/ | |||
# 日志文件名称 | |||
log_name: dsp.log | |||
# 日志打印格式 | |||
log_fmt: "{time:YYYY-MM-DD HH:mm:ss.SSS} [{level}][{process.name}-{process.id}-{thread.name}-{thread.id}][{line}] {module}-{function} - {message}" | |||
# 日志隔离级别 | |||
level: DEBUG | |||
# 日志每天0点创建新文件 | |||
rotation: 00:00 | |||
# 日志保存时间15天 | |||
retention: 15 days | |||
# 线程安全 | |||
enqueue: True | |||
# 编码格式 | |||
encoding: utf8 |
@@ -0,0 +1,11 @@ | |||
# -*- coding: utf-8 -*- | |||
from service import Dispatcher | |||
''' | |||
dsp主程序入口 | |||
''' | |||
if __name__ == '__main__': | |||
print("(♥◠‿◠)ノ゙ DSP【算法调度服务】开始启动 ლ(´ڡ`ლ)゙") | |||
Dispatcher.DispatcherService().start_service() | |||
print("(♥◠‿◠)ノ゙ DSP【算法调度服务】启动成功 ლ(´ڡ`ლ)゙") |
@@ -0,0 +1,20 @@ | |||
def message_feedback(requestId, status, type, error_code="", error_msg="", progress="", original_url="", sign_url="", | |||
category_id="", description="", analyse_time=""): | |||
taskfb = {} | |||
results = [] | |||
result_msg = {} | |||
taskfb["request_id"] = requestId | |||
taskfb["status"] = status | |||
taskfb["type"] = type | |||
taskfb["error_code"] = error_code | |||
taskfb["error_msg"] = error_msg | |||
taskfb["progress"] = progress | |||
result_msg["original_url"] = original_url | |||
result_msg["sign_url"] = sign_url | |||
result_msg["category_id"] = category_id | |||
result_msg["description"] = description | |||
result_msg["analyse_time"] = analyse_time | |||
results.append(result_msg) | |||
taskfb["results"] = results | |||
return taskfb |
@@ -0,0 +1,21 @@ | |||
from enum import Enum, unique | |||
# 分析状态枚举 | |||
@unique | |||
class AnalysisLabel(Enum): | |||
VENT = ("0", "排口", "SL014") | |||
SEWAGE_OUTLET = ("1", "排污口", "SL011") | |||
AQUATIC_VEGETATION = ("2", "水生植被", "SL013") | |||
FLOATING_OBJECTS = ("3", "漂浮物", "SL001") | |||
OTHER = ("4", "其他", "SL001") | |||
def getAnalysisLabel(order): | |||
for label in AnalysisLabel: | |||
if label.value[0] == order: | |||
return label | |||
return None |
@@ -0,0 +1,21 @@ | |||
from enum import Enum, unique | |||
# 分析状态枚举 | |||
@unique | |||
class AnalysisStatus(Enum): | |||
# 等待 | |||
WAITING = "waiting" | |||
# 分析中 | |||
RUNNING = "running" | |||
# 分析完成 | |||
SUCCESS = "success" | |||
# 超时 | |||
TIMEOUT = "timeout" | |||
# 失败 | |||
FAILED = "failed" |
@@ -0,0 +1,12 @@ | |||
from enum import Enum, unique | |||
# 分析类型枚举 | |||
@unique | |||
class AnalysisType(Enum): | |||
# 在线 | |||
ONLINE = "1" | |||
# 离线s | |||
OFFLINE = "2" |
@@ -0,0 +1,18 @@ | |||
from enum import Enum, unique | |||
# 异常枚举 | |||
@unique | |||
class ExceptionType(Enum): | |||
VIDEO_UPDATE_EXCEPTION = ("SP000", "视频上传异常, 请联系工程师定位处理!") | |||
OR_VIDEO_ADDRESS_EXCEPTION = ("SP001", "Original Video Address Error!") | |||
ANALYSE_TIMEOUT_EXCEPTION = ("SP002", "Analysis Timeout Exception!") | |||
PULLSTREAM_TIMEOUT_EXCEPTION = ("SP003", "Pull Stream Timeout Exception!") | |||
READSTREAM_TIMEOUT_EXCEPTION = ("SP004", "READ Stream Timeout Exception!") | |||
SERVICE_INNER_EXCEPTION = ("SP999", "系统内部异常, 请联系工程师定位处理!") |
@@ -0,0 +1,19 @@ | |||
# -*- coding: utf-8 -*- | |||
from loguru import logger | |||
""" | |||
自定义异常 | |||
""" | |||
class ServiceException(Exception): # 继承异常类 | |||
def __init__(self, code, msg): | |||
self.code = code | |||
self.msg = msg | |||
def __str__(self): | |||
logger.error("异常编码:{}, 异常描述:{}", self.code, self.msg) | |||
@@ -0,0 +1,193 @@ | |||
# -*- coding: utf-8 -*- | |||
import torch | |||
import time | |||
from util import YmlUtils, FileUtils, LogUtils | |||
from loguru import logger | |||
from multiprocessing import Queue | |||
from concurrency.IntelligentRecognitionProcess import OnlineIntelligentRecognitionProcess, OfflineIntelligentRecognitionProcess | |||
from concurrency.MessagePollingThread import OfflineMessagePollingThread, OnlineMessagePollingThread | |||
''' | |||
分发服务 | |||
''' | |||
class DispatcherService(): | |||
# 初始化 | |||
def __init__(self): | |||
# 获取DSP环境所需要的配置 | |||
self.content = YmlUtils.getConfigs() | |||
# 初始化日志 | |||
LogUtils.init_log(self.content) | |||
# 检查视频保存地址,不存在创建文件夹,迁移初始化 | |||
FileUtils.create_dir_not_exist(self.content["video"]["file_path"]) | |||
# 记录当前正在执行的实时流分析任务 | |||
self.onlineProcesses = {} | |||
# 记录当前正在执行的离线视频分析任务 | |||
self.offlineProcesses = {} | |||
# 记录当前正在执行的图片分析任务 | |||
# self.photoProcesses = {} | |||
self.onlineMpt = None | |||
self.offlineMpt = None | |||
# 服务调用启动方法 | |||
def start_service(self): | |||
# 解决进程启动模型的环境问题,具体问题百度一下 | |||
#torch.multiprocessing.set_start_method('spawn') | |||
# 启动实时,离线kafka消息拉取线程 | |||
self.Kafka_message_listening() | |||
# 循环消息处理 | |||
while True: | |||
# 检查任务进程运行情况,去除活动的任务 | |||
self.check_process_task() | |||
# 判断是否有空余GPU显卡,待补充 | |||
# gpuStatus = getGPUInfos() | |||
# cuda = get_available_gpu(gpuStatus) | |||
# if cuda: | |||
################## 消息驱动实时流分析进程执行 ################## | |||
onlineMsg = self.onlineMpt.poll() | |||
if onlineMsg is not None and len(onlineMsg) > 0: | |||
for k, v in onlineMsg.items(): | |||
for m in v: | |||
try: | |||
msg = m.value | |||
# 校验kafka消息 | |||
check_result = self.check_online_msg(msg) | |||
if not check_result: | |||
raise Exception("实时任务消息格式非法") | |||
if 'start' == msg.get("command"): | |||
self.startOnlineProcess(msg, self.content) | |||
elif 'stop' == msg.get("command"): | |||
self.stopOnlineProcess(msg) | |||
else: | |||
pass | |||
except Exception as e: | |||
logger.error("实时消息监听异常:") | |||
logger.exception(e) | |||
time.sleep(1) | |||
################## 消息驱动离线视频分析进程执行 ################## | |||
offlineMsg = self.offlineMpt.poll() | |||
if offlineMsg is not None and len(offlineMsg) > 0: | |||
for k, v in offlineMsg.items(): | |||
for m in v: | |||
try: | |||
msg = m.value | |||
# 校验kafka消息 | |||
check_result = self.check_offline_msg(msg) | |||
if not check_result: | |||
raise Exception("离线任务消息格式非法") | |||
self.startOfflineProcess(msg, self.content) | |||
except Exception as e: | |||
logger.error("离线消息监听异常:") | |||
logger.exception(e) | |||
def startOnlineProcess(self, msg, content): | |||
# 相同的requestId不在执行 | |||
if self.onlineProcesses.get(msg.get("request_id"), None) is not None: | |||
logger.warn("重复任务,请稍后再试!requestId:{}", msg.get("request_id")) | |||
raise Exception("重复任务,请稍后再试!requestId:{}".format(msg.get("request_id"))) | |||
# 反馈队列 | |||
fbQueue = Queue() | |||
# 图片队列 | |||
imageQueue = Queue() | |||
# 创建在线识别进程并启动 | |||
oirp = OnlineIntelligentRecognitionProcess(fbQueue, content, msg, imageQueue) | |||
oirp.start() | |||
# 记录请求与进程映射 | |||
self.onlineProcesses[msg.get("request_id")] = oirp | |||
def stopOnlineProcess(self, msg): | |||
ps = self.onlineProcesses.get(msg.get("request_id"), None) | |||
if ps is None: | |||
logger.warn("未查询到该任务,无法停止任务!requestId:{}", msg.get("request_id")) | |||
raise Exception("未查询到该任务,无法停止任务!requestId:{}".format(msg.get("request_id"))) | |||
ps.sendEvent({'command': 'stop'}) | |||
# 检查实时、离线进程任务运行情况,去除不活动的任务 | |||
def check_process_task(self): | |||
for requestId in list(self.onlineProcesses.keys()): | |||
if not self.onlineProcesses[requestId].is_alive(): | |||
del self.onlineProcesses[requestId] | |||
for requestId in list(self.offlineProcesses.keys()): | |||
if not self.offlineProcesses[requestId].is_alive(): | |||
del self.offlineProcesses[requestId] | |||
# 开启离线进程 | |||
def startOfflineProcess(self, msg, content): | |||
# 相同的requestId不在执行 | |||
if self.offlineProcesses.get(msg.get("request_id"), None) is not None: | |||
logger.warn("重复任务,请稍后再试!requestId:{}", msg.get("request_id")) | |||
raise Exception("重复任务,请稍后再试!requestId:{}".format(msg.get("request_id"))) | |||
# 反馈队列 | |||
fbQueue = Queue() | |||
# 图片队列 | |||
imageQueue = Queue() | |||
# 创建在线识别进程并启动 | |||
ofirp = OfflineIntelligentRecognitionProcess(fbQueue, content, msg, imageQueue) | |||
ofirp.start() | |||
self.offlineProcesses[msg.get("request_id")] = ofirp | |||
# 校验实时kafka消息 | |||
def check_online_msg(self, msg): | |||
requestId = msg.get("request_id", None) | |||
command = msg.get("command", None) | |||
models = msg.get("models", None) | |||
pull_url = msg.get("pull_url", None) | |||
push_url = msg.get("push_url", None) | |||
results_base_dir = msg.get("results_base_dir", None) | |||
if command is None: | |||
return False | |||
if requestId is None: | |||
return False | |||
if command == "start" and models is None: | |||
return False | |||
if models is not None: | |||
for model in models: | |||
if model.get("id", None) is None: | |||
return False | |||
if command == "start" and pull_url is None: | |||
return False | |||
if command == "start" and push_url is None: | |||
return False | |||
if command == "start" and results_base_dir is None: | |||
return False | |||
return True | |||
# 校验实时kafka消息 | |||
def check_offline_msg(self, msg): | |||
requestId = msg.get("request_id", None) | |||
models = msg.get("models", None) | |||
original_url = msg.get("original_url", None) | |||
original_type = msg.get("original_type", None) | |||
results_base_dir = msg.get("results_base_dir", None) | |||
if requestId is None: | |||
return False | |||
if models is None: | |||
return False | |||
for model in models: | |||
if model.get("id", None) is None: | |||
return False | |||
if original_url is None: | |||
return False | |||
if original_type is None: | |||
return False | |||
if results_base_dir is None: | |||
return False | |||
return True | |||
# 实时、离线kafka消息监听 | |||
def Kafka_message_listening(self): | |||
# 实时流分析消息拉取 | |||
self.onlineMpt = OnlineMessagePollingThread('online_thread', {'content': self.content, | |||
'topics': [self.content["kafka"]["topic"][ | |||
"dsp-alg-online-tasks-topic"]]}) | |||
# 离线视频分析消息拉取 | |||
self.offlineMpt = OfflineMessagePollingThread('offline_thread', {'content': self.content, | |||
'topics': [self.content["kafka"]["topic"][ | |||
"dsp-alg-offline-tasks-topic"]]}) | |||
# 开启监听线程 | |||
self.onlineMpt.start() | |||
self.offlineMpt.start() |
@@ -0,0 +1,11 @@ | |||
from kafka import KafkaProducer | |||
import json | |||
topicName = 'alg_online_tasks' | |||
eBody = {"request_id": "16446e0d79fb4497b390d1a7f49f3079","command":"stop"} | |||
producer = KafkaProducer(bootstrap_servers=[ | |||
'localhost:9092'], | |||
value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
future = producer.send(topicName, key=b'16446e0d79fb4497b390d1a7f49f3079', value=eBody) | |||
result = future.get(timeout=10) | |||
print(result) |
@@ -0,0 +1,19 @@ | |||
import cv2 | |||
def run_opencv_camera(): | |||
# 当video_stream_path = 0 会开启计算机 默认摄像头 也可以为本地视频文件的路径 | |||
cap = cv2.VideoCapture("https://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/40bb06cb-1826d260138-0004-f90c-f2c-7ec68.mp4") | |||
while True: | |||
is_opened, frame = cap.read() | |||
print(is_opened) | |||
# # imshow(window_name,img):显示图片,窗口自适应图片大小 | |||
# # window_name: 指定窗口的名字 | |||
# # img:显示的图片对象 | |||
# # 可以指定多个窗口名称,显示多个图片 | |||
cv2.imshow('frame', frame) | |||
cv2.waitKey(10) # 等待输入任何按键 | |||
# cv2.destoryWindows() | |||
# 关闭 | |||
cap.release() | |||
# 读取视频 | |||
run_opencv_camera() |
@@ -0,0 +1,69 @@ | |||
# import sys | |||
# sys.path.extend(["..", "../util"]) | |||
# from util.AliyunSdk import AliyunVodSdk | |||
# from concurrency.CommonThread import Common | |||
from kafka import KafkaProducer | |||
import json | |||
import threading | |||
# topicName = 'dsp-alg-online-tasks' | |||
# eBody = { | |||
# "request_id": "d4c909912ac741ce81ccef03fd1b2ec45", | |||
# "models": [ | |||
# { | |||
# "id": "0", | |||
# "config": {} | |||
# }, | |||
# { | |||
# "id": "1", | |||
# "config": {} | |||
# }, | |||
# { | |||
# "id": "2", | |||
# "config": {} | |||
# }, | |||
# { | |||
# "id": "3", | |||
# "config": {} | |||
# } | |||
# ], | |||
# "pull_url": "rtmp://live.play.t-aaron.com/live/THSAf_hd", | |||
# "push_url": "rtmp://live.push.t-aaron.com/live/THSAg", | |||
# "results_base_dir": "P20220802133841159" | |||
# } | |||
# producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'], | |||
# value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
# future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody) | |||
# result = future.get(timeout=10) | |||
# print(result) | |||
topicName = 'dsp-alg-offline-tasks' | |||
eBody = { | |||
"request_id": "d4c909912ac741ce81ccef03fd1b2ec45", | |||
"models": [ | |||
{ | |||
"id": "0", | |||
"config": {} | |||
}, | |||
{ | |||
"id": "1", | |||
"config": {} | |||
}, | |||
{ | |||
"id": "2", | |||
"config": {} | |||
}, | |||
{ | |||
"id": "3", | |||
"config": {} | |||
} | |||
], | |||
"original_url": "https://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/62d507f9-1826c84c625-0004-f90c-f2c-7ec68.mp4", | |||
"original_type": ".mp4", | |||
"results_base_dir": "P20220802133841159" | |||
} | |||
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'], | |||
value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody) | |||
result = future.get(timeout=10) | |||
print(result) |
@@ -0,0 +1,18 @@ | |||
# import sys | |||
# sys.path.extend(["..", "../util"]) | |||
# from util.AliyunSdk import AliyunVodSdk | |||
# from concurrency.CommonThread import Common | |||
from kafka import KafkaProducer | |||
import json | |||
import threading | |||
topicName = 'dsp-alg-command' | |||
eBody = { | |||
"request_id": "d4c909912ac741ce81ccef03fd1b2ec45", | |||
"command": "stop" | |||
} | |||
producer = KafkaProducer(bootstrap_servers=['192.168.11.13:9092'], | |||
value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
future = producer.send(topicName, key=b'd4c909912ac741ce81ccef03fd1b2ec45', value=eBody) | |||
result = future.get(timeout=10) | |||
print(result) |
@@ -0,0 +1,126 @@ | |||
# -*- coding: UTF-8 -*- | |||
import json | |||
import traceback | |||
from aliyunsdkcore.client import AcsClient | |||
from aliyunsdkvod.request.v20170321 import CreateUploadVideoRequest | |||
from aliyunsdkvod.request.v20170321 import GetPlayInfoRequest | |||
from voduploadsdk.AliyunVodUtils import * | |||
from voduploadsdk.AliyunVodUploader import AliyunVodUploader | |||
from voduploadsdk.UploadVideoRequest import UploadVideoRequest | |||
# # # 填入AccessKey信息 | |||
# def init_vod_client(accessKeyId, accessKeySecret): | |||
# regionId = 'cn-shanghai' # 点播服务接入地域 | |||
# connectTimeout = 3 # 连接超时,单位为秒 | |||
# return AcsClient(accessKeyId, accessKeySecret, regionId, auto_retry=True, max_retry_time=3, timeout=connectTimeout) | |||
# def create_upload_video(clt): | |||
# request = CreateUploadVideoRequest.CreateUploadVideoRequest() | |||
# request.set_Title('aaaaaa') | |||
# request.set_FileName('/home/thsw2/chenyukun/algSch/data/ai/video/111111.mp4') | |||
# request.set_Description('Video Description') | |||
# # //CoverURL示例:http://192.168.0.0/16/tps/TB1qnJ1PVXXXXXCXXXXXXXXXXXX-700-700.png | |||
# # request.set_CoverURL('<your Cover URL>') | |||
# # request.set_Tags('tag1,tag2') | |||
# request.set_CateId(0) | |||
# | |||
# request.set_accept_format('JSON') | |||
# response = json.loads(clt.do_action_with_exception(request)) | |||
# return response | |||
# | |||
# try: | |||
# clt = init_vod_client('LTAI5tSJ62TLMUb4SZuf285A', 'MWYynm30filZ7x0HqSHlU3pdLVNeI7') | |||
# uploadInfo = create_upload_video(clt) | |||
# print(uploadInfo['UploadAuth']) | |||
# print(json.dumps(uploadInfo, ensure_ascii=True, indent=4)) | |||
# | |||
# except Exception as e: | |||
# print(e) | |||
# print(traceback.format_exc()) | |||
# | |||
# # 刷新音视频凭证 | |||
# from aliyunsdkvod.request.v20170321 import RefreshUploadVideoRequest | |||
# def refresh_upload_video(clt, videoId): | |||
# request = RefreshUploadVideoRequest.RefreshUploadVideoRequest() | |||
# request.set_VideoId(videoId) | |||
# request.set_accept_format('JSON') | |||
# return json.loads(clt.do_action_with_exception(request)) | |||
# | |||
# try: | |||
# clt = init_vod_client('LTAI5tSJ62TLMUb4SZuf285A', 'MWYynm30filZ7x0HqSHlU3pdLVNeI7') | |||
# uploadInfo = refresh_upload_video(clt, uploadInfo["VideoId"]) | |||
# print(json.dumps(uploadInfo, ensure_ascii=False, indent=4)) | |||
# | |||
# except Exception as e: | |||
# print(e) | |||
# print(traceback.format_exc()) | |||
# | |||
# | |||
# # 获取播放地址 | |||
# # # | |||
# def get_play_info(clt, videoId): | |||
# request = GetPlayInfoRequest.GetPlayInfoRequest() | |||
# request.set_accept_format('JSON') | |||
# request.set_VideoId(videoId) | |||
# request.set_AuthTimeout(3600*5) | |||
# response = json.loads(clt.do_action_with_exception(request)) | |||
# return response | |||
# | |||
# try: | |||
# clt = init_vod_client('LTAI5tSJ62TLMUb4SZuf285A', 'MWYynm30filZ7x0HqSHlU3pdLVNeI7') | |||
# playInfo = get_play_info(clt, uploadInfo["VideoId"]) | |||
# print(json.dumps(playInfo, ensure_ascii=False, indent=4)) | |||
# | |||
# except Exception as e: | |||
# print(e) | |||
# print(traceback.format_exc()) | |||
# | |||
# # 获取视频播放凭证 | |||
# from aliyunsdkvod.request.v20170321 import GetVideoPlayAuthRequest | |||
# def get_video_playauth(clt, videoId): | |||
# request = GetVideoPlayAuthRequest.GetVideoPlayAuthRequest() | |||
# request.set_accept_format('JSON') | |||
# request.set_VideoId(videoId) | |||
# request.set_AuthInfoTimeout(3000) | |||
# response = json.loads(clt.do_action_with_exception(request)) | |||
# return response | |||
# | |||
# try: | |||
# clt = init_vod_client('LTAI5tSJ62TLMUb4SZuf285A', 'MWYynm30filZ7x0HqSHlU3pdLVNeI7') | |||
# playAuth = get_video_playauth(clt, uploadInfo["VideoId"]) | |||
# print(json.dumps(playAuth, ensure_ascii=False, indent=4)) | |||
# | |||
# except Exception as e: | |||
# print(e) | |||
# print(traceback.format_exc()) | |||
accessKeyId='LTAI5tSJ62TLMUb4SZuf285A' | |||
accessKeySecret='MWYynm30filZ7x0HqSHlU3pdLVNeI7' | |||
filePath="/data/ai/video/2e6ff252-154b-11ed-874b-18c04d1a13ab_ai_d4c909912ac741ce81ccef03fd1b2ec45.mp4" | |||
# 测试上传本地音视频 | |||
def testUploadLocalVideo(accessKeyId, accessKeySecret, filePath, storageLocation=None): | |||
try: | |||
# 可以指定上传脚本部署的ECS区域。如果ECS区域和视频点播存储区域相同,则自动使用内网上传,上传更快且更省公网流量。 | |||
# ecsRegionId ="cn-shanghai" | |||
# uploader = AliyunVodUploader(accessKeyId, accessKeySecret, ecsRegionId) | |||
# 不指定上传脚本部署的ECS区域。 | |||
uploader = AliyunVodUploader(accessKeyId, accessKeySecret) | |||
uploadVideoRequest = UploadVideoRequest(filePath, 'aiOnLineVideo') | |||
# 可以设置视频封面,如果是本地或网络图片可使用UploadImageRequest上传图片到视频点播,获取到ImageURL | |||
#ImageURL示例:https://example.com/sample-****.jpg | |||
#uploadVideoRequest.setCoverURL('<your Image URL>') | |||
# 标签 | |||
# uploadVideoRequest.setTags('taa') | |||
if storageLocation: | |||
uploadVideoRequest.setStorageLocation(storageLocation) | |||
videoId = uploader.uploadLocalVideo(uploadVideoRequest) | |||
print("videoId: %s" % (videoId)) | |||
except AliyunVodException as e: | |||
print(e) | |||
testUploadLocalVideo(accessKeyId, accessKeySecret, filePath) |
@@ -0,0 +1,134 @@ | |||
import oss2 | |||
from loguru import logger | |||
import time | |||
from alibabacloud_vod20170321.client import Client as vod20170321Client | |||
from alibabacloud_tea_openapi import models as open_api_models | |||
from alibabacloud_vod20170321 import models as vod_20170321_models | |||
from alibabacloud_tea_util import models as util_models | |||
from voduploadsdk.AliyunVodUtils import * | |||
from voduploadsdk.AliyunVodUploader import AliyunVodUploader | |||
from voduploadsdk.UploadVideoRequest import UploadVideoRequest | |||
class AliyunSdk(): | |||
def __init__(self, content): | |||
self.content = content | |||
self.bucket = None | |||
class AliyunOssSdk(AliyunSdk): | |||
def get_oss_bucket(self): | |||
if self.bucket: | |||
return self.bucket | |||
auth = oss2.Auth(self.content["aliyun"]["access_key"], self.content["aliyun"]["access_secret"]) | |||
self.bucket = oss2.Bucket(auth, self.content["aliyun"]["oss"]["endpoint"], | |||
self.content["aliyun"]["oss"]["bucket"], | |||
connect_timeout=self.content["aliyun"]["oss"]["connect_timeout"]) | |||
return self.bucket | |||
def upload_file(self, updatePath, fileByte): | |||
logger.info("开始上传文件到oss") | |||
if not self.bucket: | |||
auth = oss2.Auth(self.content["aliyun"]["access_key"], self.content["aliyun"]["access_secret"]) | |||
self.bucket = oss2.Bucket(auth, self.content["aliyun"]["oss"]["endpoint"], | |||
self.content["aliyun"]["oss"]["bucket"], | |||
connect_timeout=self.content["aliyun"]["oss"]["connect_timeout"]) | |||
MAX_RETRIES = 3 | |||
retry_count = 0 | |||
while True: | |||
try: | |||
self.bucket.put_object(updatePath, fileByte) | |||
logger.info("上传文件到oss成功!") | |||
break | |||
except Exception as e: | |||
retry_count += 1 | |||
time.sleep(2) | |||
logger.info("上传文件到oss失败,重试次数:{}", retry_count) | |||
if retry_count >= MAX_RETRIES: | |||
logger.info("上传文件到oss重试失败") | |||
logger.exception(e) | |||
raise e | |||
class ThAliyunVodSdk(AliyunSdk): | |||
# @staticmethod | |||
# def create_client( | |||
# access_key_id: str, | |||
# access_key_secret: str, | |||
# ) -> vod20170321Client: | |||
# """ | |||
# 使用AK&SK初始化账号Client | |||
# @param access_key_id: | |||
# @param access_key_secret: | |||
# @return: Client | |||
# @throws Exception | |||
# """ | |||
# config = open_api_models.Config( | |||
# # 您的 AccessKey ID, | |||
# access_key_id=access_key_id, | |||
# # 您的 AccessKey Secret, | |||
# access_key_secret=access_key_secret | |||
# ) | |||
# # 访问的域名 | |||
# config.endpoint = f'vod.aliyuncs.com' | |||
# return vod20170321Client(config) | |||
def upload_local_video(self, filePath, file_title, storageLocation=None): | |||
logger.info("开始执行vod视频上传, filePath: {}", filePath) | |||
uploader = AliyunVodUploader(self.content["aliyun"]["access_key"], self.content["aliyun"]["access_secret"]) | |||
uploadVideoRequest = UploadVideoRequest(filePath, file_title) | |||
# 可以设置视频封面,如果是本地或网络图片可使用UploadImageRequest上传图片到视频点播,获取到ImageURL | |||
# ImageURL示例:https://example.com/sample-****.jpg | |||
# uploadVideoRequest.setCoverURL('<your Image URL>') | |||
# 标签 | |||
# uploadVideoRequest.setTags('tag1,tag2') | |||
if storageLocation: | |||
uploadVideoRequest.setStorageLocation(storageLocation) | |||
MAX_RETRIES = 3 | |||
retry_count = 0 | |||
while True: | |||
try: | |||
result = uploader.uploadLocalVideo(uploadVideoRequest) | |||
logger.info("vod视频上传成功, videoId:{}", result.get("VideoId")) | |||
logger.info("vod视频地址: {}", result.get("UploadAddress").get("FileName")) | |||
return result.get("UploadAddress").get("FileName") | |||
except AliyunVodException as e: | |||
retry_count += 1 | |||
time.sleep(3) | |||
logger.info("vod视频上传失败,重试次数:{}", retry_count) | |||
if retry_count >= MAX_RETRIES: | |||
logger.info("vod视频上传重试失败") | |||
logger.exception(e) | |||
raise e | |||
# def get_video_url(self, addition_type: str = None, auth_timeout: int = None, definition: str = None, formats: str = None, | |||
# output_type: str = None, play_config: str = None, re_auth_info: str = None, result_type: str = None, | |||
# stream_type: str = None, video_id: str = None): | |||
# logger.info("开始获取vod视频地址, vedioId:{}", video_id) | |||
# client = AliyunVodSdk.create_client(self.content["aliyun"]["access_key"], self.content["aliyun"]["access_secret"]) | |||
# get_play_info_request = vod_20170321_models.GetPlayInfoRequest(addition_type=addition_type, auth_timeout=auth_timeout, | |||
# definition=definition, formats=formats, output_type=output_type, | |||
# play_config=play_config, re_auth_info=re_auth_info, | |||
# result_type=result_type, stream_type=stream_type, | |||
# video_id=video_id) | |||
# runtime = util_models.RuntimeOptions() | |||
# start = time.time() | |||
# while True: | |||
# try: | |||
# | |||
# # 复制代码运行请自行打印 API 的返回值 | |||
# vod_20170321_models.GetPlayInfoResponse = client.get_play_info_with_options(get_play_info_request, runtime) | |||
# play_url = vod_20170321_models.GetPlayInfoResponse.body.play_info_list.play_info[0].play_url | |||
# logger.info("获取vod视频地址成功,play_url:{}", play_url) | |||
# return play_url | |||
# except Exception as error: | |||
# time.sleep(5) | |||
# end = time.time() | |||
# result = int(end - start) | |||
# logger.info("时间:{}", result) | |||
# if result > 1200: | |||
# logger.info("获取vod视频地址超时失败:") | |||
# raise error |
@@ -0,0 +1,47 @@ | |||
import cv2 | |||
import subprocess as sp | |||
from loguru import logger | |||
class Cv2Util(): | |||
def __init__(self, pullUrl, pushUrl=None, orFilePath=None, aiFilePath=None): | |||
self.pullUrl = pullUrl | |||
self.pushUrl = pushUrl | |||
self.orFilePath = orFilePath | |||
self.aiFilePath = aiFilePath | |||
self.cap = None | |||
self.p = None | |||
self.or_video_file = None | |||
self.ai_video_file = None | |||
def build_cv2_config(self, is_online=True): | |||
try: | |||
self.cap = cv2.VideoCapture(self.pullUrl) | |||
fps = int(self.cap.get(cv2.CAP_PROP_FPS)) | |||
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |||
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |||
if is_online: | |||
command = ['/usr/bin/ffmpeg', | |||
'-y', # 不经过确认,输出时直接覆盖同名文件。 | |||
'-f', 'rawvideo', | |||
'-vcodec','rawvideo', | |||
'-pix_fmt', 'bgr24', | |||
'-s', "{}x{}".format(width, height), | |||
'-r', str(fps), | |||
'-i', '-', # 指定输入文件 | |||
'-c:v', 'libx264', # 指定视频编码器 | |||
'-pix_fmt', 'yuv420p', | |||
'-preset', 'ultrafast', # 指定输出的视频质量,会影响文件的生成速度,有以下几个可用的值 ultrafast, | |||
# superfast, veryfast, faster, fast, medium, slow, slower, veryslow。 | |||
'-f', 'flv', | |||
self.pushUrl] | |||
# 管道配置 | |||
logger.info("fps:{}|height:{}|width:{}", fps, height, width) | |||
self.p = sp.Popen(command, stdin=sp.PIPE) | |||
self.or_video_file = cv2.VideoWriter(self.orFilePath, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |||
self.ai_video_file = cv2.VideoWriter(self.aiFilePath, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |||
except Exception as e: | |||
logger.error("初始化cv2异常:") | |||
logger.exception(e) | |||
@@ -0,0 +1,15 @@ | |||
# -*- coding: utf-8 -*- | |||
import os | |||
from loguru import logger | |||
''' | |||
文件处理工具类 | |||
''' | |||
def create_dir_not_exist(path): | |||
logger.info("检查文件夹是否存在: {}", path) | |||
if not os.path.exists(path): | |||
logger.info("开始创建文件夹: {}", path) | |||
os.makedirs(path) | |||
logger.info("文件夹创建完成 {}", path) |
@@ -0,0 +1,501 @@ | |||
#@@ -1,43 +1,43 @@ | |||
# GPUtil - GPU utilization | |||
# | |||
# A Python module for programmically getting the GPU utilization from NVIDA GPUs using nvidia-smi | |||
# | |||
# Author: Anders Krogh Mortensen (anderskm) | |||
# Date: 16 January 2017 | |||
# Web: https://github.com/anderskm/gputil | |||
# | |||
# LICENSE | |||
# | |||
# MIT License | |||
# | |||
# Copyright (c) 2017 anderskm | |||
# | |||
# Permission is hereby granted, free of charge, to any person obtaining a copy | |||
# of this software and associated documentation files (the "Software"), to deal | |||
# in the Software without restriction, including without limitation the rights | |||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |||
# copies of the Software, and to permit persons to whom the Software is | |||
# furnished to do so, subject to the following conditions: | |||
# | |||
# The above copyright notice and this permission notice shall be included in all | |||
# copies or substantial portions of the Software. | |||
# | |||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |||
# SOFTWARE. | |||
from subprocess import Popen, PIPE | |||
from distutils import spawn | |||
import os | |||
import math | |||
import random | |||
import time | |||
import sys | |||
import platform | |||
import subprocess | |||
import numpy as np | |||
__version__ = '1.4.0' | |||
class GPU: | |||
def __init__(self, ID, uuid, load, memoryTotal, memoryUsed, memoryFree, driver, gpu_name, serial, display_mode, display_active, temp_gpu): | |||
self.id = ID | |||
self.uuid = uuid | |||
self.load = load | |||
self.memoryUtil = float(memoryUsed)/float(memoryTotal) | |||
self.memoryTotal = memoryTotal | |||
self.memoryUsed = memoryUsed | |||
self.memoryFree = memoryFree | |||
self.driver = driver | |||
self.name = gpu_name | |||
self.serial = serial | |||
self.display_mode = display_mode | |||
self.display_active = display_active | |||
self.temperature = temp_gpu | |||
def __str__(self): | |||
return str(self.__dict__) | |||
class GPUProcess: | |||
def __init__(self, pid, processName, gpuId, gpuUuid, gpuName, usedMemory, | |||
uid, uname): | |||
self.pid = pid | |||
self.processName = processName | |||
self.gpuId = gpuId | |||
self.gpuUuid = gpuUuid | |||
self.gpuName = gpuName | |||
self.usedMemory = usedMemory | |||
self.uid = uid | |||
self.uname = uname | |||
def __str__(self): | |||
return str(self.__dict__) | |||
def safeFloatCast(strNumber): | |||
try: | |||
number = float(strNumber) | |||
except ValueError: | |||
number = float('nan') | |||
return number | |||
#def getGPUs(): | |||
def getNvidiaSmiCmd(): | |||
if platform.system() == "Windows": | |||
# If the platform is Windows and nvidia-smi | |||
# could not be found from the environment path, | |||
#@@ -75,57 +94,97 @@ def getGPUs(): | |||
nvidia_smi = "%s\\Program Files\\NVIDIA Corporation\\NVSMI\\nvidia-smi.exe" % os.environ['systemdrive'] | |||
else: | |||
nvidia_smi = "nvidia-smi" | |||
return nvidia_smi | |||
def getGPUs(): | |||
# Get ID, processing and memory utilization for all GPUs | |||
nvidia_smi = getNvidiaSmiCmd() | |||
try: | |||
p = Popen([nvidia_smi,"--query-gpu=index,uuid,utilization.gpu,memory.total,memory.used,memory.free,driver_version,name,gpu_serial,display_active,display_mode,temperature.gpu", "--format=csv,noheader,nounits"], stdout=PIPE) | |||
stdout, stderror = p.communicate() | |||
p = subprocess.run([ | |||
nvidia_smi, | |||
"--query-gpu=index,uuid,utilization.gpu,memory.total,memory.used,memory.free,driver_version,name,gpu_serial,display_active,display_mode,temperature.gpu", | |||
"--format=csv,noheader,nounits" | |||
], stdout=subprocess.PIPE, encoding='utf8') | |||
stdout, stderror = p.stdout, p.stderr | |||
except: | |||
return [] | |||
output = stdout;#output = stdout.decode('UTF-8') | |||
# output = output[2:-1] # Remove b' and ' from string added by python | |||
#print(output) | |||
output = stdout | |||
## Parse output | |||
# Split on line break | |||
lines = output.split(os.linesep) | |||
#print(lines) | |||
numDevices = len(lines)-1 | |||
GPUs = [] | |||
for g in range(numDevices): | |||
line = lines[g] | |||
#print(line) | |||
vals = line.split(', ') | |||
#print(vals) | |||
for i in range(12): | |||
# print(vals[i]) | |||
if (i == 0): | |||
deviceIds = int(vals[i]) | |||
elif (i == 1): | |||
uuid = vals[i] | |||
elif (i == 2): | |||
gpuUtil = safeFloatCast(vals[i])/100 | |||
elif (i == 3): | |||
memTotal = safeFloatCast(vals[i]) | |||
elif (i == 4): | |||
memUsed = safeFloatCast(vals[i]) | |||
elif (i == 5): | |||
memFree = safeFloatCast(vals[i]) | |||
elif (i == 6): | |||
driver = vals[i] | |||
elif (i == 7): | |||
gpu_name = vals[i] | |||
elif (i == 8): | |||
serial = vals[i] | |||
elif (i == 9): | |||
display_active = vals[i] | |||
elif (i == 10): | |||
display_mode = vals[i] | |||
elif (i == 11): | |||
temp_gpu = safeFloatCast(vals[i]); | |||
deviceIds = int(vals[0]) | |||
uuid = vals[1] | |||
gpuUtil = safeFloatCast(vals[2]) / 100 | |||
memTotal = safeFloatCast(vals[3]) | |||
memUsed = safeFloatCast(vals[4]) | |||
memFree = safeFloatCast(vals[5]) | |||
driver = vals[6] | |||
gpu_name = vals[7] | |||
serial = vals[8] | |||
display_active = vals[9] | |||
display_mode = vals[10] | |||
temp_gpu = safeFloatCast(vals[11]); | |||
GPUs.append(GPU(deviceIds, uuid, gpuUtil, memTotal, memUsed, memFree, driver, gpu_name, serial, display_mode, display_active, temp_gpu)) | |||
return GPUs # (deviceIds, gpuUtil, memUtil) | |||
def getGPUProcesses(): | |||
"""Get all gpu compute processes.""" | |||
global gpuUuidToIdMap | |||
gpuUuidToIdMap = {} | |||
try: | |||
gpus = getGPUs() | |||
for gpu in gpus: | |||
gpuUuidToIdMap[gpu.uuid] = gpu.id | |||
del gpus | |||
except: | |||
pass | |||
nvidia_smi = getNvidiaSmiCmd() | |||
try: | |||
p = subprocess.run([ | |||
nvidia_smi, | |||
"--query-compute-apps=pid,process_name,gpu_uuid,gpu_name,used_memory", | |||
"--format=csv,noheader,nounits" | |||
], stdout=subprocess.PIPE, encoding='utf8') | |||
stdout, stderror = p.stdout, p.stderr | |||
except: | |||
return [] | |||
output = stdout | |||
## Parse output | |||
# Split on line break | |||
lines = output.split(os.linesep) | |||
numProcesses = len(lines) - 1 | |||
processes = [] | |||
for g in range(numProcesses): | |||
line = lines[g] | |||
#print(line) | |||
vals = line.split(', ') | |||
#print(vals) | |||
pid = int(vals[0]) | |||
processName = vals[1] | |||
gpuUuid = vals[2] | |||
gpuName = vals[3] | |||
usedMemory = safeFloatCast(vals[4]) | |||
gpuId = gpuUuidToIdMap[gpuUuid] | |||
if gpuId is None: | |||
gpuId = -1 | |||
# get uid and uname owner of the pid | |||
try: | |||
p = subprocess.run(['ps', f'-p{pid}', '-oruid=,ruser='], | |||
stdout=subprocess.PIPE, encoding='utf8') | |||
uid, uname = p.stdout.split() | |||
uid = int(uid) | |||
except: | |||
uid, uname = -1, '' | |||
processes.append(GPUProcess(pid, processName, gpuId, gpuUuid, | |||
gpuName, usedMemory, uid, uname)) | |||
return processes | |||
def getAvailable(order = 'first', limit=1, maxLoad=0.5, maxMemory=0.5, memoryFree=0, includeNan=False, excludeID=[], excludeUUID=[]): | |||
# order = first | last | random | load | memory | |||
# first --> select the GPU with the lowest ID (DEFAULT) | |||
# last --> select the GPU with the highest ID | |||
# random --> select a random available GPU | |||
# load --> select the GPU with the lowest load | |||
# memory --> select the GPU with the most memory available | |||
# limit = 1 (DEFAULT), 2, ..., Inf | |||
# Limit sets the upper limit for the number of GPUs to return. E.g. if limit = 2, but only one is available, only one is returned. | |||
# Get device IDs, load and memory usage | |||
GPUs = getGPUs() | |||
# Determine, which GPUs are available | |||
GPUavailability = getAvailability(GPUs, maxLoad=maxLoad, maxMemory=maxMemory, memoryFree=memoryFree, includeNan=includeNan, excludeID=excludeID, excludeUUID=excludeUUID) | |||
availAbleGPUindex = [idx for idx in range(0,len(GPUavailability)) if (GPUavailability[idx] == 1)] | |||
# Discard unavailable GPUs | |||
GPUs = [GPUs[g] for g in availAbleGPUindex] | |||
# Sort available GPUs according to the order argument | |||
if (order == 'first'): | |||
GPUs.sort(key=lambda x: float('inf') if math.isnan(x.id) else x.id, reverse=False) | |||
elif (order == 'last'): | |||
GPUs.sort(key=lambda x: float('-inf') if math.isnan(x.id) else x.id, reverse=True) | |||
elif (order == 'random'): | |||
GPUs = [GPUs[g] for g in random.sample(range(0,len(GPUs)),len(GPUs))] | |||
elif (order == 'load'): | |||
GPUs.sort(key=lambda x: float('inf') if math.isnan(x.load) else x.load, reverse=False) | |||
elif (order == 'memory'): | |||
GPUs.sort(key=lambda x: float('inf') if math.isnan(x.memoryUtil) else x.memoryUtil, reverse=False) | |||
# Extract the number of desired GPUs, but limited to the total number of available GPUs | |||
GPUs = GPUs[0:min(limit, len(GPUs))] | |||
# Extract the device IDs from the GPUs and return them | |||
deviceIds = [gpu.id for gpu in GPUs] | |||
return deviceIds | |||
#def getAvailability(GPUs, maxLoad = 0.5, maxMemory = 0.5, includeNan = False): | |||
# # Determine, which GPUs are available | |||
# GPUavailability = np.zeros(len(GPUs)) | |||
# for i in range(len(GPUs)): | |||
# if (GPUs[i].load < maxLoad or (includeNan and np.isnan(GPUs[i].load))) and (GPUs[i].memoryUtil < maxMemory or (includeNan and np.isnan(GPUs[i].memoryUtil))): | |||
# GPUavailability[i] = 1 | |||
def getAvailability(GPUs, maxLoad=0.5, maxMemory=0.5, memoryFree=0, includeNan=False, excludeID=[], excludeUUID=[]): | |||
# Determine, which GPUs are available | |||
GPUavailability = [1 if (gpu.memoryFree>=memoryFree) and (gpu.load < maxLoad or (includeNan and math.isnan(gpu.load))) and (gpu.memoryUtil < maxMemory or (includeNan and math.isnan(gpu.memoryUtil))) and ((gpu.id not in excludeID) and (gpu.uuid not in excludeUUID)) else 0 for gpu in GPUs] | |||
return GPUavailability | |||
def getFirstAvailable(order = 'first', maxLoad=0.5, maxMemory=0.5, attempts=1, interval=900, verbose=False, includeNan=False, excludeID=[], excludeUUID=[]): | |||
#GPUs = getGPUs() | |||
#firstAvailableGPU = np.NaN | |||
#for i in range(len(GPUs)): | |||
# if (GPUs[i].load < maxLoad) & (GPUs[i].memory < maxMemory): | |||
# firstAvailableGPU = GPUs[i].id | |||
# break | |||
#return firstAvailableGPU | |||
for i in range(attempts): | |||
if (verbose): | |||
print('Attempting (' + str(i+1) + '/' + str(attempts) + ') to locate available GPU.') | |||
# Get first available GPU | |||
available = getAvailable(order=order, limit=1, maxLoad=maxLoad, maxMemory=maxMemory, includeNan=includeNan, excludeID=excludeID, excludeUUID=excludeUUID) | |||
# If an available GPU was found, break for loop. | |||
if (available): | |||
if (verbose): | |||
print('GPU ' + str(available) + ' located!') | |||
break | |||
# If this is not the last attempt, sleep for 'interval' seconds | |||
if (i != attempts-1): | |||
time.sleep(interval) | |||
# Check if an GPU was found, or if the attempts simply ran out. Throw error, if no GPU was found | |||
if (not(available)): | |||
raise RuntimeError('Could not find an available GPU after ' + str(attempts) + ' attempts with ' + str(interval) + ' seconds interval.') | |||
# Return found GPU | |||
return available | |||
def showUtilization(all=False, attrList=None, useOldCode=False): | |||
GPUs = getGPUs() | |||
if (all): | |||
if (useOldCode): | |||
print(' ID | Name | Serial | UUID || GPU util. | Memory util. || Memory total | Memory used | Memory free || Display mode | Display active |') | |||
print('------------------------------------------------------------------------------------------------------------------------------') | |||
for gpu in GPUs: | |||
print(' {0:2d} | {1:s} | {2:s} | {3:s} || {4:3.0f}% | {5:3.0f}% || {6:.0f}MB | {7:.0f}MB | {8:.0f}MB || {9:s} | {10:s}'.format(gpu.id,gpu.name,gpu.serial,gpu.uuid,gpu.load*100,gpu.memoryUtil*100,gpu.memoryTotal,gpu.memoryUsed,gpu.memoryFree,gpu.display_mode,gpu.display_active)) | |||
else: | |||
attrList = [[{'attr':'id','name':'ID'}, | |||
{'attr':'name','name':'Name'}, | |||
{'attr':'serial','name':'Serial'}, | |||
{'attr':'uuid','name':'UUID'}], | |||
[{'attr':'temperature','name':'GPU temp.','suffix':'C','transform': lambda x: x,'precision':0}, | |||
{'attr':'load','name':'GPU util.','suffix':'%','transform': lambda x: x*100,'precision':0}, | |||
{'attr':'memoryUtil','name':'Memory util.','suffix':'%','transform': lambda x: x*100,'precision':0}], | |||
[{'attr':'memoryTotal','name':'Memory total','suffix':'MB','precision':0}, | |||
{'attr':'memoryUsed','name':'Memory used','suffix':'MB','precision':0}, | |||
{'attr':'memoryFree','name':'Memory free','suffix':'MB','precision':0}], | |||
[{'attr':'display_mode','name':'Display mode'}, | |||
{'attr':'display_active','name':'Display active'}]] | |||
else: | |||
if (useOldCode): | |||
print(' ID GPU MEM') | |||
print('--------------') | |||
for gpu in GPUs: | |||
print(' {0:2d} {1:3.0f}% {2:3.0f}%'.format(gpu.id, gpu.load*100, gpu.memoryUtil*100)) | |||
else: | |||
attrList = [[{'attr':'id','name':'ID'}, | |||
{'attr':'load','name':'GPU','suffix':'%','transform': lambda x: x*100,'precision':0}, | |||
{'attr':'memoryUtil','name':'MEM','suffix':'%','transform': lambda x: x*100,'precision':0}], | |||
] | |||
if (not useOldCode): | |||
if (attrList is not None): | |||
headerString = '' | |||
GPUstrings = ['']*len(GPUs) | |||
for attrGroup in attrList: | |||
#print(attrGroup) | |||
for attrDict in attrGroup: | |||
headerString = headerString + '| ' + attrDict['name'] + ' ' | |||
headerWidth = len(attrDict['name']) | |||
minWidth = len(attrDict['name']) | |||
attrPrecision = '.' + str(attrDict['precision']) if ('precision' in attrDict.keys()) else '' | |||
attrSuffix = str(attrDict['suffix']) if ('suffix' in attrDict.keys()) else '' | |||
attrTransform = attrDict['transform'] if ('transform' in attrDict.keys()) else lambda x : x | |||
for gpu in GPUs: | |||
attr = getattr(gpu,attrDict['attr']) | |||
attr = attrTransform(attr) | |||
if (isinstance(attr,float)): | |||
attrStr = ('{0:' + attrPrecision + 'f}').format(attr) | |||
elif (isinstance(attr,int)): | |||
attrStr = ('{0:d}').format(attr) | |||
elif (isinstance(attr,str)): | |||
attrStr = attr; | |||
elif (sys.version_info[0] == 2): | |||
if (isinstance(attr,unicode)): | |||
attrStr = attr.encode('ascii','ignore') | |||
else: | |||
raise TypeError('Unhandled object type (' + str(type(attr)) + ') for attribute \'' + attrDict['name'] + '\'') | |||
attrStr += attrSuffix | |||
minWidth = max(minWidth,len(attrStr)) | |||
headerString += ' '*max(0,minWidth-headerWidth) | |||
minWidthStr = str(minWidth - len(attrSuffix)) | |||
for gpuIdx,gpu in enumerate(GPUs): | |||
attr = getattr(gpu,attrDict['attr']) | |||
attr = attrTransform(attr) | |||
if (isinstance(attr,float)): | |||
attrStr = ('{0:'+ minWidthStr + attrPrecision + 'f}').format(attr) | |||
elif (isinstance(attr,int)): | |||
attrStr = ('{0:' + minWidthStr + 'd}').format(attr) | |||
elif (isinstance(attr,str)): | |||
attrStr = ('{0:' + minWidthStr + 's}').format(attr); | |||
elif (sys.version_info[0] == 2): | |||
if (isinstance(attr,unicode)): | |||
attrStr = ('{0:' + minWidthStr + 's}').format(attr.encode('ascii','ignore')) | |||
else: | |||
raise TypeError('Unhandled object type (' + str(type(attr)) + ') for attribute \'' + attrDict['name'] + '\'') | |||
attrStr += attrSuffix | |||
GPUstrings[gpuIdx] += '| ' + attrStr + ' ' | |||
headerString = headerString + '|' | |||
for gpuIdx,gpu in enumerate(GPUs): | |||
GPUstrings[gpuIdx] += '|' | |||
headerSpacingString = '-' * len(headerString) | |||
print(headerString) | |||
print(headerSpacingString) | |||
for GPUstring in GPUstrings: | |||
print(GPUstring) | |||
# Generate gpu uuid to id map | |||
gpuUuidToIdMap = {} | |||
try: | |||
gpus = getGPUs() | |||
for gpu in gpus: | |||
gpuUuidToIdMap[gpu.uuid] = gpu.id | |||
del gpus | |||
except: | |||
pass | |||
def getGPUInfos(): | |||
###返回gpus:list,一个GPU为一个元素-对象 | |||
###########:有属性,'id','load','memoryFree', | |||
###########:'memoryTotal','memoryUsed','memoryUtil','name','serial''temperature','uuid',process | |||
###其中process:每一个计算进程是一个元素--对象 | |||
############:有属性,'gpuId','gpuName','gpuUuid', | |||
############:'gpuid','pid','processName','uid', 'uname','usedMemory' | |||
gpus = getGPUs() | |||
gpuUuidToIdMap={} | |||
for gpu in gpus: | |||
gpuUuidToIdMap[gpu.uuid] = gpu.id | |||
gpu.process=[] | |||
indexx = [x.id for x in gpus ] | |||
process = getGPUProcesses() | |||
for pre in process: | |||
pre.gpuid = gpuUuidToIdMap[pre.gpuUuid] | |||
gpuId = indexx.index(pre.gpuid ) | |||
gpus[gpuId].process.append(pre ) | |||
return gpus | |||
def get_available_gpu(gpuStatus): | |||
##判断是否有空闲的显卡,如果有返回id,没有返回None | |||
cuda=None | |||
for gpus in gpuStatus: | |||
if len(gpus.process) == 0: | |||
cuda = gpus.id | |||
return str(cuda) | |||
return cuda | |||
def get_whether_gpuProcess(): | |||
##判断是否有空闲的显卡,如果有返回id,没有返回None | |||
gpuStatus=getGPUInfos() | |||
gpuProcess=True | |||
for gpus in gpuStatus: | |||
if len(gpus.process) != 0: | |||
gpuProcess = False | |||
return gpuProcess | |||
def get_offlineProcess_gpu(gpuStatus,pidInfos): | |||
gpu_onLine = [] | |||
for gpu in gpuStatus: | |||
for gpuProcess in gpu.process: | |||
pid = gpuProcess.pid | |||
if pid in pidInfos.keys(): | |||
pidType = pidInfos[pid]['type'] | |||
if pidType == 'onLine': | |||
gpu_onLine.append(gpu) | |||
gpu_offLine = set(gpuStatus) - set(gpu_onLine) | |||
return list(gpu_offLine) | |||
def arrange_offlineProcess(gpuStatus,pidInfos,modelMemory=1500): | |||
cudaArrange=[] | |||
gpu_offLine = get_offlineProcess_gpu(gpuStatus,pidInfos) | |||
for gpu in gpu_offLine: | |||
leftMemory = gpu.memoryTotal*0.9 - gpu.memoryUsed | |||
modelCnt = int(leftMemory// modelMemory) | |||
cudaArrange.extend( [gpu.id] * modelCnt ) | |||
return cudaArrange | |||
def get_potential_gpu(gpuStatus,pidInfos): | |||
###所有GPU上都有计算。需要为“在线任务”空出一块显卡。 | |||
###step1:查看所有显卡上是否有“在线任务” | |||
gpu_offLine = get_offlineProcess_gpu(gpuStatus,pidInfos) | |||
if len(gpu_offLine) == 0 : | |||
return False | |||
###step2,找出每张显卡上离线进程的数目 | |||
offLineCnt = [ len(gpu.process) for gpu in gpu_offLine ] | |||
minCntIndex =offLineCnt.index( min(offLineCnt)) | |||
pids = [x.pid for x in gpu_offLine[minCntIndex].process] | |||
return {'cuda':gpu_offLine[minCntIndex].id,'pids':pids } | |||
if __name__=='__main__': | |||
#pres = getGPUProcesses() | |||
#print('###line404:',pres) | |||
gpus = getGPUs() | |||
for gpu in gpus: | |||
gpuUuidToIdMap[gpu.uuid] = gpu.id | |||
print(gpu) | |||
print(gpuUuidToIdMap) | |||
pres = getGPUProcesses() | |||
print('###line404:',pres) | |||
for pre in pres: | |||
print('#'*20) | |||
for ken in ['gpuName','gpuUuid','pid','processName','uid','uname','usedMemory' ]: | |||
print(ken,' ',pre.__getattribute__(ken )) | |||
print(' ') | |||
@@ -0,0 +1,129 @@ | |||
from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata | |||
from kafka.errors import kafka_errors | |||
import json | |||
from loguru import logger | |||
# 生产者 | |||
class CustomerKafkaProducer(): | |||
def __init__(self, content): | |||
self.content = content | |||
configs = self.content["kafka"][self.content["kafka"]["active"]]["producer"] | |||
self.customerProducer = KafkaProducer(bootstrap_servers=self.content["kafka"][self.content["kafka"]["active"]]["bootstrap_servers"], | |||
acks=configs["acks"], | |||
retries=configs["retries"], | |||
linger_ms=configs["linger_ms"], | |||
retry_backoff_ms=configs["retry_backoff_ms"], | |||
max_in_flight_requests_per_connection=configs[ | |||
"max_in_flight_requests_per_connection"], | |||
key_serializer=lambda m: json.dumps(m).encode('utf-8'), | |||
value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
# 获取kafka生产者 | |||
def get_producer(self): | |||
if self.customerProducer: | |||
return self.customerProducer | |||
logger.info("配置kafka生产者") | |||
configs = self.content["kafka"][self.content["kafka"]["active"]]["producer"] | |||
self.customerProducer = KafkaProducer(bootstrap_servers=self.content["kafka"][self.content["kafka"]["active"]]["bootstrap_servers"], | |||
acks=configs["acks"], | |||
retries=configs["retries"], | |||
linger_ms=configs["linger_ms"], | |||
retry_backoff_ms=configs["retry_backoff_ms"], | |||
max_in_flight_requests_per_connection=configs[ | |||
"max_in_flight_requests_per_connection"], | |||
key_serializer=lambda m: json.dumps(m).encode('utf-8'), | |||
value_serializer=lambda m: json.dumps(m).encode('utf-8')) | |||
logger.info("配置生产者完成") | |||
return self.customerProducer | |||
# mode 模式1:异步发送 2:同步发送 | |||
# def on_send_success(record_metadata): 成功回调 | |||
# def on_send_error(excp): 失败回调 | |||
def sender(self, topic, key, message, mode, customer_send_success=None, customer_send_error=None): | |||
logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}", topic, key, message, mode) | |||
if mode == 1: | |||
if not customer_send_success: | |||
customer_send_success = CustomerKafkaProducer.on_send_success | |||
if not customer_send_error: | |||
customer_send_error = CustomerKafkaProducer.on_send_error | |||
self.customerProducer.send(topic=topic, key=key, value=message).add_callback( | |||
customer_send_success).add_errback(customer_send_error) | |||
if mode == 2: | |||
try: | |||
self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30) | |||
logger.info("kafka同步发送信息成功") | |||
except kafka_errors as e: | |||
logger.error("kafka同步发送消息异常:") | |||
logger.exception(e) | |||
raise e | |||
def close_producer(self): | |||
self.customerProducer.flush() | |||
self.customerProducer.close() | |||
logger.info("kafka生产者关闭完成") | |||
def on_send_success(record_metadata): | |||
logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}", record_metadata.topic, record_metadata.partition, | |||
record_metadata.offset) | |||
def on_send_error(excp): | |||
logger.error("kafka异步发送消息异常:") | |||
logger.error(excp) | |||
# 生产者 | |||
class CustomerKafkaConsumer(): | |||
def __init__(self, content): | |||
logger.info("初始化消费者") | |||
self.content = content | |||
configs = self.content["kafka"][self.content["kafka"]["active"]]["consumer"] | |||
self.customerConsumer = KafkaConsumer(bootstrap_servers=self.content["kafka"][self.content["kafka"]["active"]]["bootstrap_servers"], | |||
client_id=configs["client_id"], | |||
group_id=configs["group_id"], | |||
auto_offset_reset=configs["auto_offset_reset"], | |||
enable_auto_commit=configs["enable_auto_commit"], | |||
max_poll_records=configs["max_poll_records"], | |||
value_deserializer=lambda m: json.loads(m.decode('utf-8'))) | |||
logger.info("初始化消费者完成") | |||
def get_consumer(self): | |||
logger.info("获取消费者!") | |||
if self.customerConsumer: | |||
logger.info("获取消费者成功!") | |||
return self.customerConsumer | |||
configs = self.content["kafka"][self.content["kafka"]["active"]]["consumer"] | |||
self.customerConsumer = KafkaConsumer(bootstrap_servers=self.content["kafka"][self.content["kafka"]["active"]]["bootstrap_servers"], | |||
client_id=configs["client_id"], | |||
group_id=configs["group_id"], | |||
auto_offset_reset=configs["auto_offset_reset"], | |||
enable_auto_commit=configs["enable_auto_commit"], | |||
max_poll_records=configs["max_poll_records"], | |||
value_deserializer=lambda m: json.loads(m.decode('utf-8'))) | |||
logger.info("消费者获取成功") | |||
return self.customerConsumer | |||
def subscribe(self, topics=(), pattern=None, listener=None): | |||
logger.info("kafka生产者订阅topic:{}", topics) | |||
if topics is None or len(topics) == 0: | |||
logger.error("消费者订阅topic不能为空!") | |||
raise Exception("消费者订阅topic不能为空!") | |||
# 手动配置分区 | |||
# customer_partition = [] | |||
# for topic in topics: | |||
# for p in self.content["kafka"]["topic"][topic]["partition"]: | |||
# customer_partition.append(TopicPartition(topic, p)) | |||
# self.customerConsumer.assign(customer_partition) | |||
# 自动配置 | |||
self.customerConsumer.subscribe(topics=topics, pattern=pattern, listener=listener) | |||
logger.info("kafka生产者订阅topic完成") | |||
def commit_offset(self, message): | |||
logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1, | |||
message.partition) | |||
tp = TopicPartition(topic=message.topic, partition=message.partition) | |||
self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(message.offset + 1, None))}) | |||
logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1, | |||
message.partition) |
@@ -0,0 +1,29 @@ | |||
# -*- coding: utf-8 -*- | |||
import sys | |||
import os | |||
from loguru import logger | |||
# 初始化日志配置 | |||
def init_log(content): | |||
print("开始初始化日志配置") | |||
if not os.path.exists(content["log"]["base_path"]): | |||
os.makedirs(content["log"]["base_path"]) | |||
# 移除日志设置 | |||
logger.remove(handler_id=None) | |||
# 打印日志到文件 | |||
if content["log"]["enable_file_log"]: | |||
logger.add(content["log"]["base_path"] + content["log"]["log_name"], | |||
rotation=content["log"]["rotation"], | |||
retention=content["log"]["retention"], | |||
format=content["log"]["log_fmt"], | |||
level=content["log"]["level"], | |||
enqueue=content["log"]["enqueue"], | |||
encoding=content["log"]["encoding"]) | |||
# 控制台输出 | |||
if content["log"]["enable_stderr"]: | |||
logger.add(sys.stderr, | |||
format=content["log"]["log_fmt"], | |||
level=content["log"]["level"], | |||
enqueue=True) | |||
print("完成初始化日志配置") |
@@ -0,0 +1,40 @@ | |||
import sys | |||
sys.path.extend(['..', '../AIlib']) | |||
from segutils.segmodel import SegModel, get_largest_contours | |||
from utils.torch_utils import select_device | |||
from utilsK.queRiver import get_labelnames, get_label_arrays, post_process_ | |||
from models.experimental import attempt_load | |||
from AI import AI_process | |||
class SZModel(): | |||
def __init__(self, allowedList=None): | |||
##预先设置的参数 | |||
self.device_ = '1' ##选定模型,可选 cpu,'0','1' | |||
##以下参数目前不可改 | |||
self.Detweights = "../AIlib/weights/yolov5/class5/best_5classes.pt" | |||
self.seg_nclass = 2 | |||
self.Segweights = "../AIlib/weights/BiSeNet/checkpoint.pth" | |||
self.conf_thres = 0.25 | |||
self.iou_thres = 0.45 | |||
self.classes = 5 | |||
self.labelnames = "../AIlib/weights/yolov5/class5/labelnames.json" | |||
self.rainbows = [[0, 0, 255], [0, 255, 0], [255, 0, 0], [255, 0, 255], [255, 255, 0], [255, 129, 0], | |||
[255, 0, 127], [127, 255, 0], [0, 255, 127], [0, 127, 255], [127, 0, 255], [255, 127, 255], | |||
[255, 255, 127], [127, 255, 255], [0, 255, 255], [255, 127, 255], [127, 255, 255], [0, 127, 0], | |||
[0, 0, 127], [0, 255, 255]] | |||
self.allowedList = allowedList | |||
##加载模型,准备好显示字符 | |||
self.device = select_device(self.device_) | |||
self.names = get_labelnames(self.labelnames) | |||
self.label_arraylist = get_label_arrays(self.names, self.rainbows, outfontsize=40, | |||
fontpath="../AIlib/conf/platech.ttf") | |||
self.half = self.device.type != 'cpu' | |||
self.model = attempt_load(self.Detweights, map_location=self.device) | |||
if self.half: self.model.half() | |||
self.segmodel = self.segmodel = SegModel(nclass=self.seg_nclass, weights=self.Segweights, device=self.device) | |||
def SZ_process(self, frame): | |||
return AI_process([frame], self.model, self.segmodel, self.names, self.label_arraylist, | |||
self.rainbows, self.half, self.device, self.conf_thres, self.iou_thres, | |||
self.allowedList) |
@@ -0,0 +1,12 @@ | |||
import time | |||
import datetime | |||
import calendar | |||
def generate_timestamp(): | |||
"""根据当前时间获取时间戳,返回整数""" | |||
return int(time.time()) | |||
def now_date_to_str(dft): | |||
if not dft: | |||
return None | |||
return datetime.datetime.now().strftime(dft) |
@@ -0,0 +1,18 @@ | |||
import os | |||
import yaml | |||
from common import Constant | |||
# 从配置文件读取所有配置信息 | |||
def getConfigs(): | |||
print("开始读取配置文件,获取配置消息:", Constant.APPLICATION_CONFIG) | |||
applicationConfigPath = os.path.abspath(Constant.APPLICATION_CONFIG) | |||
if not os.path.exists(applicationConfigPath): | |||
raise Exception("未找到配置文件:{}".format(Constant.APPLICATION_CONFIG)) | |||
with open(applicationConfigPath, Constant.R, encoding=Constant.UTF_8) as f: | |||
file_content = f.read() | |||
content = yaml.load(file_content, yaml.FullLoader) | |||
if not content: | |||
raise Exception("配置项不能为空:{}".format(Constant.APPLICATION_CONFIG)) | |||
print("读取配置文件完成!") | |||
return content |