|
|
@@ -1,583 +1,3 @@ |
|
|
|
<<<<<<< HEAD |
|
|
|
import numpy as np |
|
|
|
import time,ast,copy |
|
|
|
#from flask import request, Flask,jsonify |
|
|
|
import base64,cv2,os,sys,json |
|
|
|
#sys.path.extend(['../yolov5']) |
|
|
|
#from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp |
|
|
|
from segutils.segmodel import SegModel,get_largest_contours |
|
|
|
from models.experimental import attempt_load |
|
|
|
from utils.datasets import LoadStreams, LoadImages |
|
|
|
from utils.torch_utils import select_device, load_classifier, time_synchronized |
|
|
|
from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str |
|
|
|
import subprocess as sp |
|
|
|
import matplotlib.pyplot as plt |
|
|
|
import torch,random,string |
|
|
|
import multiprocessing |
|
|
|
from multiprocessing import Process,Queue |
|
|
|
import traceback |
|
|
|
from kafka import KafkaProducer, KafkaConsumer,TopicPartition |
|
|
|
from kafka.errors import kafka_errors |
|
|
|
|
|
|
|
#torch.multiprocessing.set_start_method('spawn') |
|
|
|
import utilsK |
|
|
|
from utilsK.GPUtils import * |
|
|
|
from utilsK.masterUtils import * |
|
|
|
from utilsK.sendUtils import create_status_msg,update_json |
|
|
|
|
|
|
|
#from utilsK.modelEval import onlineModelProcsss |
|
|
|
import random,string |
|
|
|
from Send_tranfer_oss import msg_dict_on,msg_dict_off |
|
|
|
|
|
|
|
process_id=0 |
|
|
|
|
|
|
|
def onlineModelProcess(parIn ): |
|
|
|
DEBUG=False |
|
|
|
streamName = parIn['streamName'] |
|
|
|
childCallback=parIn['callback'] |
|
|
|
outStrList={} |
|
|
|
channelIndex=parIn['channelIndex'] |
|
|
|
#try: |
|
|
|
for wan in ['test']: |
|
|
|
jsonfile=parIn['modelJson'] |
|
|
|
with open(jsonfile,'r') as fp: |
|
|
|
parAll = json.load(fp) |
|
|
|
|
|
|
|
Detweights=parAll['gpu_process']['det_weights'] |
|
|
|
seg_nclass = parAll['gpu_process']['seg_nclass'] |
|
|
|
Segweights = parAll['gpu_process']['seg_weights'] |
|
|
|
StreamRecoveringTime=int(parAll['StreamRecoveringTime']) |
|
|
|
videoSave = parAll['AI_video_save'] |
|
|
|
imageTxtFile = parAll['imageTxtFile'] |
|
|
|
taskId,msgId = streamName.split('-')[1:3] |
|
|
|
|
|
|
|
inSource,outSource=parIn['inSource'],parIn['outSource'] |
|
|
|
|
|
|
|
##构建日志文件 |
|
|
|
if outSource != 'NO': |
|
|
|
logdir = parAll['logChildProcessOnline'] |
|
|
|
waitingTime=parAll['StreamWaitingTime'] |
|
|
|
else: |
|
|
|
logdir = parAll['logChildProcessOffline'] |
|
|
|
waitingTime=5 |
|
|
|
logname='gpuprocess.log' |
|
|
|
fp_log=create_logFile(logdir=logdir,name=logname) |
|
|
|
logger=logdir.replace('/','.')+'.'+logname |
|
|
|
kafka_par=parIn['kafka_par'] |
|
|
|
producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000) |
|
|
|
|
|
|
|
####要先检查视频的有效性 |
|
|
|
###开始的时候,如果在线任务没有流,要发送的心跳消息,msg_h, |
|
|
|
msg_h= copy.deepcopy(msg_dict_off); |
|
|
|
msg_h['status']='waiting';msg_h['msg_id']=msgId |
|
|
|
|
|
|
|
|
|
|
|
thread='master:gpuprocess-%s'%(msgId) |
|
|
|
if outSource == 'NO': |
|
|
|
msg_h['type']=1 |
|
|
|
Stream_ok,_= get_fps_rtmp(inSource,video=True) |
|
|
|
else: |
|
|
|
msg_h['type']=2 |
|
|
|
msg_h_d = json.dumps(msg_h, ensure_ascii=False) |
|
|
|
outStrList=get_infos(taskId, msgId,msg_h_d,key_str='waiting stream or video, send heartbeat') |
|
|
|
Stream_ok=check_stream(inSource,producer,kafka_par,msg_h_d,outStrList,fp_log,logger,line=sys._getframe().f_lineno,thread=thread ,timeMs=waitingTime) |
|
|
|
|
|
|
|
if Stream_ok:###发送开始信号 |
|
|
|
msg_h['status']='running' |
|
|
|
msg_h_d = json.dumps(msg_h, ensure_ascii=False) |
|
|
|
outStrList= get_infos(taskId, msgId,msg_h_d,key_str='informing stream/video is ok') |
|
|
|
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
|
|
|
|
else: |
|
|
|
####检测离线视频是否有效,无效要报错 |
|
|
|
outstr='offline vedio or live stream Error:%s '%(inSource) |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,level='ERROR',line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
msg_h['error']='Stream or video ERROR';msg_h['status']='failed'; |
|
|
|
msg_h_d = json.dumps(msg_h, ensure_ascii=False); |
|
|
|
outStrList= get_infos(taskId, msgId,msg_h_d,key_str='informing invaid video or stream success') |
|
|
|
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
childCallback.send(' offline vedio or live stream Error') |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')): |
|
|
|
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[1][0:4]; |
|
|
|
else: |
|
|
|
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[1][0:4] |
|
|
|
fps = int(fps+0.5) |
|
|
|
if fps>30: fps=25 ###线下测试时候,有时候读帧率是9000,明显不符合实际,所以加这个判断。 |
|
|
|
if outSource != 'NO': |
|
|
|
command=['/usr/bin/ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24', |
|
|
|
'-s', "{}x{}".format(outW,outH),# 图片分辨率 |
|
|
|
'-r', str(fps),# 视频帧率 |
|
|
|
'-i', '-','-c:v', |
|
|
|
'libx264', |
|
|
|
'-pix_fmt', 'yuv420p', |
|
|
|
'-f', 'flv',outSource |
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
video_flag = videoSave['onLine'] |
|
|
|
logdir = parAll['logChildProcessOnline'] |
|
|
|
waitingTime=parAll['StreamWaitingTime'] |
|
|
|
else: |
|
|
|
video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline'] |
|
|
|
waitingTime=5 |
|
|
|
|
|
|
|
device = select_device(parIn['device']) |
|
|
|
half = device.type != 'cpu' # half precision only supported on CUDA |
|
|
|
model = attempt_load(Detweights, map_location=device) # load FP32 model |
|
|
|
if half: model.half() |
|
|
|
|
|
|
|
segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##后处理参数 |
|
|
|
par=parAll['post_process'] |
|
|
|
conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes'] |
|
|
|
outImaDir = par['outImaDir'] |
|
|
|
outVideoDir = par['outVideoDir'] |
|
|
|
labelnames=par['labelnames'] |
|
|
|
rainbows=par['rainbows'] |
|
|
|
fpsample = par['fpsample'] |
|
|
|
names=get_labelnames(labelnames) |
|
|
|
label_arraylist = get_label_arrays(names,rainbows,outfontsize=40) |
|
|
|
|
|
|
|
#dataset = LoadStreams(inSource, img_size=640, stride=32) |
|
|
|
|
|
|
|
|
|
|
|
childCallback.send('####model load success####') |
|
|
|
print('#####line153:',outVideoDir,video_flag) |
|
|
|
if (outVideoDir!='NO') : ####2022.06.27新增在线任务也要传AI视频和原始视频 |
|
|
|
if video_flag: |
|
|
|
msg_id = streamName.split('-')[2] |
|
|
|
save_path = os.path.join(outVideoDir,msg_id+'.MP4') |
|
|
|
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) |
|
|
|
if vid_writer.isOpened(): outstr='touch video success:%s'%(save_path);level='INFO' |
|
|
|
else:outstr='touch video failed:%s'%(save_path);level='ERROR' |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,level=level,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
else: |
|
|
|
msg_id = streamName.split('-')[2] |
|
|
|
save_path_OR = os.path.join(outVideoDir,msg_id+'_OR.MP4') |
|
|
|
vid_writer_OR = cv2.VideoWriter(save_path_OR, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) |
|
|
|
save_path_AI = os.path.join(outVideoDir,msg_id+'_AI.MP4') |
|
|
|
vid_writer_AI = cv2.VideoWriter(save_path_AI, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) |
|
|
|
if vid_writer_AI.isOpened() and vid_writer_OR.isOpened() :outstr='touch video success:%s,%s'%(save_path_OR,save_path_AI);level='INFO' |
|
|
|
else:outstr='touch video failed:%s,%s, fps:%d ,%d , %d'%(save_path_OR,save_path_AI,fps,outW,outH);level='ERROR' |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,level=level,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
iframe = 0;post_results=[];time_beg=time.time() |
|
|
|
|
|
|
|
t00=time.time() |
|
|
|
time_kafka0=time.time() |
|
|
|
|
|
|
|
Pushed_Flag=False |
|
|
|
while True: |
|
|
|
try: |
|
|
|
dataset = LoadStreams(inSource, img_size=640, stride=32) |
|
|
|
# 管道配置,其中用到管道 |
|
|
|
if outSource !='NO' and (not Pushed_Flag): |
|
|
|
ppipe = sp.Popen(command, stdin=sp.PIPE);Pushed_Flag = True |
|
|
|
for path, img, im0s, vid_cap in dataset: |
|
|
|
t0= time_synchronized() |
|
|
|
if outSource == 'NO':###如果不推流,则显示进度条。离线不推流 |
|
|
|
view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] ) |
|
|
|
streamCheckCnt=0 |
|
|
|
###直播和离线都是1分钟发一次消息 |
|
|
|
time_kafka1 = time.time() |
|
|
|
if time_kafka1 - time_kafka0 >60: |
|
|
|
time_kafka0 = time_kafka1 |
|
|
|
###发送状态信息waiting |
|
|
|
msg = copy.deepcopy(msg_dict_off); |
|
|
|
msg['msg_id']= msgId; |
|
|
|
if outSource == 'NO': |
|
|
|
msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt) |
|
|
|
msg['type']=1 |
|
|
|
else: |
|
|
|
msg['progressbarOn']= str(iframe) |
|
|
|
msg['type']=2 |
|
|
|
|
|
|
|
msg = json.dumps(msg, ensure_ascii=False) |
|
|
|
outStrList= get_infos(taskId, msgId,msg,key_str='processing send progressbar or online heartbeat') |
|
|
|
send_kafka(producer,kafka_par,msg,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
|
|
|
|
time0=time.time() |
|
|
|
iframe +=1 |
|
|
|
time1=time.time() |
|
|
|
img = torch.from_numpy(img).to(device) |
|
|
|
img = img.half() if half else img.float() # uint8 to fp16/32 |
|
|
|
|
|
|
|
img /= 255.0 # 0 - 255 to 0.0 - 1.0 |
|
|
|
|
|
|
|
timeseg0 = time.time() |
|
|
|
seg_pred,segstr = segmodel.eval(im0s[0] ) |
|
|
|
timeseg1 = time.time() |
|
|
|
|
|
|
|
t1= time_synchronized() |
|
|
|
|
|
|
|
pred = model(img,augment=False)[0] |
|
|
|
|
|
|
|
time4 = time.time() |
|
|
|
datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe] |
|
|
|
|
|
|
|
p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe) |
|
|
|
t2= time_synchronized() |
|
|
|
|
|
|
|
#print('###line138:',timeOut,outSource,outVideoDir) |
|
|
|
##每隔 fpsample帧处理一次,如果有问题就保存图片 |
|
|
|
if (iframe % fpsample == 0) and (len(post_results)>0) : |
|
|
|
parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile) |
|
|
|
post_results=[] |
|
|
|
|
|
|
|
if len(p_result[2] )>0: ## |
|
|
|
post_results.append(p_result) |
|
|
|
t3= time_synchronized() |
|
|
|
image_array = p_result[1] |
|
|
|
if outSource!='NO': |
|
|
|
ppipe.stdin.write(image_array.tobytes()) |
|
|
|
|
|
|
|
if (outVideoDir!='NO'): |
|
|
|
if video_flag: ret = vid_writer.write(image_array) |
|
|
|
else: |
|
|
|
time_w0=time.time() |
|
|
|
ret = vid_writer_AI.write(image_array) |
|
|
|
ret = vid_writer_OR.write(im0s[0]) |
|
|
|
time_w1=time.time() |
|
|
|
#if not ret: |
|
|
|
# print('\n write two videos time:%f ms'%(time_w1-time_w0)*1000,ret) |
|
|
|
t4= time_synchronized() |
|
|
|
|
|
|
|
timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) |
|
|
|
if iframe%100==0: |
|
|
|
outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000) |
|
|
|
#wrtiteLog(fp_log,outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger,printFlag=False) |
|
|
|
#print(outstr) |
|
|
|
t00 = t4; |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
#if outSource:###推流才有如下 |
|
|
|
streamCheckCnt+=1;taskEnd=False |
|
|
|
if streamCheckCnt==1:timeBreak0=time.time();time_kafka0 = time.time() |
|
|
|
timeBreak1=time.time(); |
|
|
|
if timeBreak1-timeBreak0 >5 and Pushed_Flag:###流断开5秒后,要关闭推流 |
|
|
|
ppipe.kill();Pushed_Flag=False |
|
|
|
writeELK_log(msg='stream pip is killed ',fp=fp_log,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
###读接口,看看任务有没有结束 |
|
|
|
ChanellInfos,taskEnd=query_channel_status(channelIndex) |
|
|
|
####taskEnd######################DEBUG |
|
|
|
#taskEnd=False |
|
|
|
|
|
|
|
if timeBreak1-timeBreak0 >StreamRecoveringTime : ##默认30分钟内,流没有恢复的话,就断开。 |
|
|
|
taskEnd=True |
|
|
|
|
|
|
|
outstr_channel='%s ,taskEnd:%s'%(ChanellInfos,taskEnd) |
|
|
|
writeELK_log(msg=outstr_channel,fp=fp_log,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
if outSource == 'NO':#离线没有推流 |
|
|
|
taskEnd=True |
|
|
|
if taskEnd: |
|
|
|
if timeBreak1-timeBreak0 > 60:###超时结束 |
|
|
|
writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='超时结束') |
|
|
|
else: |
|
|
|
writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束') |
|
|
|
if (outVideoDir!='NO'): |
|
|
|
if video_flag:vid_writer.release() |
|
|
|
else: |
|
|
|
vid_writer_OR.release(); |
|
|
|
vid_writer_AI.release(); |
|
|
|
outstr='Task ends:%.1f , msgid:%s,taskID:%s '%(timeBreak1-timeBreak0,taskId,msgId) |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
break |
|
|
|
|
|
|
|
##执行到这里的一定是在线任务,在等待流的过程中要发送waiting |
|
|
|
time_kafka1 = time.time() |
|
|
|
if time_kafka1-time_kafka0>60: |
|
|
|
msg_res = copy.deepcopy(msg_dict_off); |
|
|
|
msg_res['msg_id']= msgId; msg_res['type']=2 |
|
|
|
msg_res = json.dumps(msg_res, ensure_ascii=False) |
|
|
|
outStrList= get_infos(taskId, msgId,msg_res,key_str='Waiting stream restoring heartbeat') |
|
|
|
send_kafka(producer,kafka_par,msg_res,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
|
|
|
|
outstr='Waiting stream recovering:%.1f s'%(timeBreak1-timeBreak0) |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
writeELK_log(msg=outstr_channel,fp=fp_log,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
time_kafka0 = time_kafka1 |
|
|
|
|
|
|
|
#break###断流或者到终点 |
|
|
|
|
|
|
|
time.sleep(5) |
|
|
|
print('Waiting stream for ',e) |
|
|
|
def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par,channelIndex='LC001'): |
|
|
|
|
|
|
|
if outSource=='NO': |
|
|
|
streamName='off-%s-%s'%(taskId,msgId) |
|
|
|
else: |
|
|
|
streamName='live-%s-%s'%(taskId,msgId) |
|
|
|
dataPar ={ |
|
|
|
'imgData':'', |
|
|
|
'imgName':'testW', |
|
|
|
'streamName':streamName, |
|
|
|
'taskId':taskId, |
|
|
|
'msgId':msgId, |
|
|
|
'channelIndex':channelIndex, |
|
|
|
'device':str(gpuid), |
|
|
|
'modelJson':modelJson, |
|
|
|
'kafka_par':kafka_par, |
|
|
|
|
|
|
|
} |
|
|
|
#dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None |
|
|
|
dataPar['inSource'] = inSource;dataPar['outSource'] = outSource |
|
|
|
process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid |
|
|
|
parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn |
|
|
|
gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,)) |
|
|
|
gpuProcess.start() |
|
|
|
#print(dir(gpuProcess)) |
|
|
|
#child_return = parent_conn.recv() |
|
|
|
#timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) |
|
|
|
#print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return) |
|
|
|
|
|
|
|
return gpuProcess |
|
|
|
|
|
|
|
|
|
|
|
msg_dict_offline = { |
|
|
|
|
|
|
|
"biz_id":"hehuzhang", |
|
|
|
"mod_id":"ai", |
|
|
|
"msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) , |
|
|
|
"offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4", |
|
|
|
"offering_type":"mp4", |
|
|
|
"results_base_dir": "XJRW202203171535"+str(random.randint(10,99)), |
|
|
|
|
|
|
|
'outSource':'NO' |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
taskStatus={} |
|
|
|
taskStatus['onLine'] = Queue(100) |
|
|
|
taskStatus['offLine']= Queue(100) |
|
|
|
taskStatus['pidInfos']= {} |
|
|
|
|
|
|
|
def get_msg_from_kafka(par): |
|
|
|
thread='master:readingKafka' |
|
|
|
outStrList={} |
|
|
|
fp_log = par['fp_log'] |
|
|
|
logger=par['logger'] |
|
|
|
consumer = KafkaConsumer(bootstrap_servers=par['server'],client_id='AI_server',group_id=par['group_id'],auto_offset_reset='latest') |
|
|
|
consumer.subscribe( par['topic'][0:2]) |
|
|
|
outstr='reading kafka process starts' |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] } |
|
|
|
producer = KafkaProducer( |
|
|
|
bootstrap_servers=par['server'],#tencent yun |
|
|
|
value_serializer=lambda v: v.encode('utf-8'), |
|
|
|
metadata_max_age_ms=120000) |
|
|
|
|
|
|
|
|
|
|
|
for ii,msg in enumerate(consumer): |
|
|
|
##读取消息 |
|
|
|
try: |
|
|
|
taskInfos = eval(msg.value.decode('utf-8') ) |
|
|
|
except: |
|
|
|
outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic) |
|
|
|
continue |
|
|
|
|
|
|
|
if msg.topic == par['topic'][0]: ## |
|
|
|
taskInfos['inSource']= taskInfos['pull_channel']; |
|
|
|
taskInfos['outSource']= get_push_address(taskInfos['push_channel']) ; |
|
|
|
|
|
|
|
taskStatus['onLine'].put( taskInfos ) |
|
|
|
save_message(par['kafka'],taskInfos) |
|
|
|
|
|
|
|
###发送状态信息waiting |
|
|
|
msg = create_status_msg(msg_dict_on,taskInfos,sts='waiting') |
|
|
|
outStrList=get_infos(taskInfos['results_base_dir'], taskInfos['msg_id'],msg,key_str='read msgs from kafka online task and response to kafka') |
|
|
|
send_kafka(producer,kafka_par,msg,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread); |
|
|
|
|
|
|
|
else: |
|
|
|
try: |
|
|
|
taskInfos['inSource']= taskInfos['offering_id']; |
|
|
|
taskInfos['outSource']= 'NO' |
|
|
|
|
|
|
|
taskStatus['offLine'].put( taskInfos ) |
|
|
|
save_message(par['kafka'],taskInfos) |
|
|
|
|
|
|
|
###发送状态信息waiting |
|
|
|
msg = create_status_msg(msg_dict_off,taskInfos,sts='waiting') |
|
|
|
outStrList=get_infos(taskInfos['results_base_dir'], taskInfos['msg_id'],msg,key_str='read msgs from kafka offline task and response to kafka') |
|
|
|
send_kafka(producer,kafka_par,msg,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
except Exception as e: |
|
|
|
print('######msg Error######',msg,e) |
|
|
|
|
|
|
|
|
|
|
|
def detector(par): |
|
|
|
|
|
|
|
####初始化信息列表 |
|
|
|
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] } |
|
|
|
producer = KafkaProducer( |
|
|
|
bootstrap_servers=par['server'],#tencent yun |
|
|
|
value_serializer=lambda v: v.encode('utf-8'), |
|
|
|
metadata_max_age_ms=120000) |
|
|
|
time_interval=par['logPrintInterval'] |
|
|
|
logname='detector.log';thread='master:detector' |
|
|
|
fp_log=create_logFile(logdir=par['logDir'],name=logname) |
|
|
|
##准备日志函数所需参数 |
|
|
|
logger=par['logDir'].replace('/','.')+'.'+logname |
|
|
|
#wrtiteLog(fp_log,'########### detector process starts ######\n'); |
|
|
|
outstr='detector process starts';sys._getframe().f_lineno |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
###开启kafka consumer 进程## |
|
|
|
parIn=copy.deepcopy(par);parIn['fp_log']=fp_log ;parIn['logger']=logger |
|
|
|
HeartProcess=Process(target=get_msg_from_kafka,name='process-consumer-kafka',args=(parIn,)) |
|
|
|
HeartProcess.start() |
|
|
|
|
|
|
|
|
|
|
|
timeSleep=1 |
|
|
|
|
|
|
|
time0=time.time() |
|
|
|
time0_kafQuery=time.time() |
|
|
|
time0_taskQuery=time.time() |
|
|
|
time0_sleep=time.time() |
|
|
|
outStrList={} |
|
|
|
while True:###每隔timeSleep秒,轮询一次 |
|
|
|
|
|
|
|
time0_taskQuery,printFlag = check_time_interval(time0_taskQuery,time_interval) |
|
|
|
outstr_task= ' task queue onLine cnt:%d offLine:%d'%(taskStatus['onLine'].qsize(), taskStatus['offLine'].qsize()) |
|
|
|
if (taskStatus['onLine'].qsize()>0) or (taskStatus['offLine'].qsize()>0): |
|
|
|
#outstr_task=wrtiteLog(fp_log,outstr_task);print( outstr_task); |
|
|
|
writeELK_log(msg=outstr_task,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
##2-更新显卡信息 |
|
|
|
gpuStatus = getGPUInfos() |
|
|
|
|
|
|
|
##3-优先考虑在线任务 |
|
|
|
if not taskStatus['onLine'].empty(): |
|
|
|
###3.1-先判断有没有空闲显卡: |
|
|
|
cuda = get_available_gpu(gpuStatus) |
|
|
|
###获取在线任务信息,并执行,lauch process |
|
|
|
taskInfos = taskStatus['onLine'].get() |
|
|
|
outstr='start to process onLine taskId:%s msgId:%s'%( taskInfos['results_base_dir'],taskInfos['msg_id'] ) |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
if cuda: ###3.1.1 -有空余显卡 |
|
|
|
#lauch process |
|
|
|
msg= copy.deepcopy(msg_dict_on); |
|
|
|
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code']) |
|
|
|
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos} |
|
|
|
|
|
|
|
else:###3.1.2-没有显卡 |
|
|
|
##判断有没有显卡上面都是离线进程的 |
|
|
|
cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos']) |
|
|
|
|
|
|
|
if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程 |
|
|
|
cuda = cuda_pid['cuda'] |
|
|
|
pids = cuda_pid['pids'] |
|
|
|
##kill 离线进程,并更新离线任务表 |
|
|
|
cnt_off_0 = taskStatus['offLine'].qsize() |
|
|
|
for pid in pids: |
|
|
|
##kill 离线进程 |
|
|
|
taskStatus['pidInfos'][pid]['gpuProcess'].kill() |
|
|
|
##更新离线任务表 |
|
|
|
taskStatus['offLine'].put( taskStatus['pidInfos'][pid]['taskInfos'] ) |
|
|
|
taskInfos_off=taskStatus['pidInfos'][pid]['taskInfos'] |
|
|
|
|
|
|
|
##发送离线数据,说明状态变成waiting |
|
|
|
msg= msg_dict_off; |
|
|
|
msg=update_json(taskInfos_off,msg,offkeys=["msg_id","biz_id" ,"mod_id"] ) |
|
|
|
msg['results'][0]['original_url']=taskInfos_off['inSource'] |
|
|
|
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos_off['outSource']) |
|
|
|
msg['status']='waiting' |
|
|
|
msg = json.dumps(msg, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
|
|
outStrList=get_infos(taskInfos_off['results_base_dir'], taskInfos_off['msg_id'],msg,key_str='start online task after kill offline tasks') |
|
|
|
|
|
|
|
send_kafka(producer,kafka_par,msg,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread ); |
|
|
|
|
|
|
|
|
|
|
|
cnt_off_1 = taskStatus['offLine'].qsize() |
|
|
|
outstr='before killing process, offtask cnt:%d ,after killing, offtask cnt:%d '%(cnt_off_0,cnt_off_1) |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code']) |
|
|
|
|
|
|
|
###更新pidinfos,update pidInfos |
|
|
|
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos} |
|
|
|
else: |
|
|
|
outstr='No available GPUs for onLine task' |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print(outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,level='ERROR',thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
##4-更新显卡信息 |
|
|
|
gpuStatus = getGPUInfos() |
|
|
|
##5-考虑离线任务 |
|
|
|
if not taskStatus['offLine'].empty(): |
|
|
|
cudaArrange= arrange_offlineProcess(gpuStatus,taskStatus['pidInfos'],modelMemory=1500) |
|
|
|
outstr='IN OFF LINE TASKS available cudas:%s'%(cudaArrange) |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
for cuda in cudaArrange: |
|
|
|
if not taskStatus['offLine'].empty(): |
|
|
|
taskInfos = taskStatus['offLine'].get() |
|
|
|
outstr='start to process offLine taskId:%s msgId:%s'%( taskInfos['results_base_dir'],taskInfos['msg_id'] ) |
|
|
|
taskInfos['channel_code']='LC999'###离线消息没有这个参数 |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code']) |
|
|
|
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'offLine','taskInfos':taskInfos} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if get_whether_gpuProcess(): |
|
|
|
time0_sleep,printFlag = check_time_interval(time0_sleep,time_interval) |
|
|
|
if printFlag: |
|
|
|
outstr= '*'*20 +'sleep '+'*'*20; |
|
|
|
#outstr=wrtiteLog(fp_log,outstr);print( outstr); |
|
|
|
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger) |
|
|
|
|
|
|
|
time.sleep(timeSleep) |
|
|
|
|
|
|
|
print('########Program End#####') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
par={}; |
|
|
|
###topic0--在线,topic1--离线 |
|
|
|
|
|
|
|
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test'; |
|
|
|
#101.132.127.1:19092 |
|
|
|
|
|
|
|
''' |
|
|
|
par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test'; |
|
|
|
|
|
|
|
par['kafka']='mintors/kafka' |
|
|
|
par['modelJson']='conf/model.json' |
|
|
|
''' |
|
|
|
masterFile="conf/master.json" |
|
|
|
assert os.path.exists(masterFile) |
|
|
|
with open(masterFile,'r') as fp: |
|
|
|
data=json.load(fp) |
|
|
|
par=data['par'] |
|
|
|
print(par) |
|
|
|
detector(par) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
======= |
|
|
|
import numpy as np
|
|
|
|
import time,ast,copy
|
|
|
|
#from flask import request, Flask,jsonify
|
|
|
@@ -1156,4 +576,3 @@ if __name__ == '__main__': |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
>>>>>>> thsw |