用kafka接收消息
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

587 lines
30KB

  1. import numpy as np
  2. import time,ast,copy
  3. #from flask import request, Flask,jsonify
  4. import base64,cv2,os,sys,json
  5. #sys.path.extend(['../yolov5'])
  6. #from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp
  7. from segutils.segmodel import SegModel,get_largest_contours
  8. from models.experimental import attempt_load
  9. from utils.datasets import LoadStreams, LoadImages
  10. from utils.torch_utils import select_device, load_classifier, time_synchronized
  11. from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
  12. import subprocess as sp
  13. import matplotlib.pyplot as plt
  14. import torch,random,string
  15. import multiprocessing
  16. from multiprocessing import Process,Queue
  17. import traceback
  18. from kafka import KafkaProducer, KafkaConsumer,TopicPartition
  19. from kafka.errors import kafka_errors
  20. #torch.multiprocessing.set_start_method('spawn')
  21. import utilsK
  22. from utilsK.GPUtils import *
  23. from utilsK.masterUtils import *
  24. from utilsK.sendUtils import create_status_msg,update_json,get_today
  25. #from utilsK.modelEval import onlineModelProcsss
  26. import random,string
  27. from Send_tranfer_oss import msg_dict_on,msg_dict_off
  28. process_id=0
  29. def onlineModelProcess(parIn ):
  30. DEBUG=False
  31. streamName = parIn['streamName']
  32. childCallback=parIn['callback']
  33. outStrList={}
  34. channelIndex=parIn['channelIndex']
  35. #try:
  36. for wan in ['test']:
  37. jsonfile=parIn['modelJson']
  38. with open(jsonfile,'r') as fp:
  39. parAll = json.load(fp)
  40. Detweights=parAll['gpu_process']['det_weights']
  41. seg_nclass = parAll['gpu_process']['seg_nclass']
  42. Segweights = parAll['gpu_process']['seg_weights']
  43. StreamRecoveringTime=int(parAll['StreamRecoveringTime'])
  44. videoSave = parAll['AI_video_save']
  45. imageTxtFile = parAll['imageTxtFile']
  46. taskId,msgId = streamName.split('-')[1:3]
  47. inSource,outSource=parIn['inSource'],parIn['outSource']
  48. ##构建日志文件
  49. if outSource != 'NO':
  50. logdir = parAll['logChildProcessOnline']
  51. waitingTime=parAll['StreamWaitingTime']
  52. else:
  53. logdir = parAll['logChildProcessOffline']
  54. waitingTime=5
  55. logname='gpuprocess.log'
  56. fp_log=create_logFile(logdir=logdir,name=logname)
  57. logger=logdir.replace('/','.')+'.'+logname
  58. kafka_par=parIn['kafka_par']
  59. producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000)
  60. ####要先检查视频的有效性
  61. ###开始的时候,如果在线任务没有流,要发送的心跳消息,msg_h,
  62. msg_h= copy.deepcopy(msg_dict_off);
  63. msg_h['status']='waiting';msg_h['msg_id']=msgId
  64. thread='master:gpuprocess-%s'%(msgId)
  65. if outSource == 'NO':
  66. msg_h['type']=1
  67. Stream_ok,_= get_fps_rtmp(inSource,video=True)
  68. else:
  69. msg_h['type']=2
  70. msg_h_d = json.dumps(msg_h, ensure_ascii=False)
  71. outStrList=get_infos(taskId, msgId,msg_h_d,key_str='waiting stream or video, send heartbeat')
  72. Stream_ok=check_stream(inSource,producer,kafka_par,msg_h_d,outStrList,fp_log,logger,line=sys._getframe().f_lineno,thread=thread ,timeMs=waitingTime)
  73. if Stream_ok:###发送开始信号
  74. msg_h['status']='running'
  75. msg_h_d = json.dumps(msg_h, ensure_ascii=False)
  76. outStrList= get_infos(taskId, msgId,msg_h_d,key_str='informing stream/video is ok')
  77. send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  78. else:
  79. ####检测离线视频是否有效,无效要报错
  80. outstr='offline vedio or live stream Error:%s '%(inSource)
  81. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  82. writeELK_log(msg=outstr,fp=fp_log,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
  83. msg_h['error']='Stream or video ERROR';msg_h['status']='failed';
  84. msg_h_d = json.dumps(msg_h, ensure_ascii=False);
  85. outStrList= get_infos(taskId, msgId,msg_h_d,key_str='informing invaid video or stream success')
  86. send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  87. childCallback.send(' offline vedio or live stream Error')
  88. continue
  89. if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')):
  90. fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[1][0:4];
  91. else:
  92. fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[1][0:4]
  93. fps = int(fps+0.5)
  94. if fps>30: fps=25 ###线下测试时候,有时候读帧率是9000,明显不符合实际,所以加这个判断。
  95. if outSource != 'NO':
  96. command=['/usr/bin/ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24',
  97. '-s', "{}x{}".format(outW,outH),# 图片分辨率
  98. '-r', str(fps),# 视频帧率
  99. '-i', '-','-c:v',
  100. 'libx264',
  101. '-pix_fmt', 'yuv420p',
  102. '-f', 'flv',outSource
  103. ]
  104. video_flag = videoSave['onLine']
  105. logdir = parAll['logChildProcessOnline']
  106. waitingTime=parAll['StreamWaitingTime']
  107. else:
  108. video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline']
  109. waitingTime=5
  110. device = select_device(parIn['device'])
  111. half = device.type != 'cpu' # half precision only supported on CUDA
  112. model = attempt_load(Detweights, map_location=device) # load FP32 model
  113. if half: model.half()
  114. segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device)
  115. ##后处理参数
  116. par=parAll['post_process']
  117. conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
  118. outImaDir = par['outImaDir']
  119. outVideoDir = par['outVideoDir']
  120. labelnames=par['labelnames']
  121. rainbows=par['rainbows']
  122. fpsample = par['fpsample']
  123. names=get_labelnames(labelnames)
  124. label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
  125. #dataset = LoadStreams(inSource, img_size=640, stride=32)
  126. childCallback.send('####model load success####')
  127. print('#####line153:',outVideoDir,video_flag)
  128. os.makedirs( os.path.join(outVideoDir,get_today()) ,exist_ok=True)
  129. if (outVideoDir!='NO') : ####2022.06.27新增在线任务也要传AI视频和原始视频
  130. if video_flag:
  131. msg_id = streamName.split('-')[2]
  132. save_path = os.path.join(outVideoDir,get_today(),msg_id+'.MP4')
  133. vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
  134. if vid_writer.isOpened(): outstr='touch video success:%s'%(save_path);level='INFO'
  135. else:outstr='touch video failed:%s'%(save_path);level='ERROR'
  136. writeELK_log(msg=outstr,fp=fp_log,level=level,line=sys._getframe().f_lineno,logger=logger)
  137. else:
  138. msg_id = streamName.split('-')[2]
  139. save_path_OR = os.path.join(outVideoDir,get_today(),msg_id+'_OR.MP4')
  140. vid_writer_OR = cv2.VideoWriter(save_path_OR, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
  141. save_path_AI = os.path.join(outVideoDir,get_today(),msg_id+'_AI.MP4')
  142. vid_writer_AI = cv2.VideoWriter(save_path_AI, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
  143. if vid_writer_AI.isOpened() and vid_writer_OR.isOpened() :outstr='touch video success:%s,%s'%(save_path_OR,save_path_AI);level='INFO'
  144. else:outstr='touch video failed:%s,%s, fps:%d ,%d , %d'%(save_path_OR,save_path_AI,fps,outW,outH);level='ERROR'
  145. writeELK_log(msg=outstr,fp=fp_log,level=level,line=sys._getframe().f_lineno,logger=logger)
  146. iframe = 0;post_results=[];time_beg=time.time()
  147. t00=time.time()
  148. time_kafka0=time.time()
  149. Pushed_Flag=False
  150. while True:
  151. try:
  152. dataset = LoadStreams(inSource, img_size=640, stride=32)
  153. # 管道配置,其中用到管道
  154. if outSource !='NO' and (not Pushed_Flag):
  155. ppipe = sp.Popen(command, stdin=sp.PIPE);Pushed_Flag = True
  156. for path, img, im0s, vid_cap in dataset:
  157. t0= time_synchronized()
  158. if outSource == 'NO':###如果不推流,则显示进度条。离线不推流
  159. view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] )
  160. streamCheckCnt=0
  161. ###直播和离线都是1分钟发一次消息
  162. time_kafka1 = time.time()
  163. if time_kafka1 - time_kafka0 >60:
  164. time_kafka0 = time_kafka1
  165. ###发送状态信息waiting
  166. msg = copy.deepcopy(msg_dict_off);
  167. msg['msg_id']= msgId;
  168. if outSource == 'NO':
  169. msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt)
  170. msg['type']=1
  171. else:
  172. msg['progressbarOn']= str(iframe)
  173. msg['type']=2
  174. msg = json.dumps(msg, ensure_ascii=False)
  175. outStrList= get_infos(taskId, msgId,msg,key_str='processing send progressbar or online heartbeat')
  176. send_kafka(producer,kafka_par,msg,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  177. time0=time.time()
  178. iframe +=1
  179. time1=time.time()
  180. img = torch.from_numpy(img).to(device)
  181. img = img.half() if half else img.float() # uint8 to fp16/32
  182. img /= 255.0 # 0 - 255 to 0.0 - 1.0
  183. timeseg0 = time.time()
  184. seg_pred,segstr = segmodel.eval(im0s[0] )
  185. timeseg1 = time.time()
  186. t1= time_synchronized()
  187. pred = model(img,augment=False)[0]
  188. time4 = time.time()
  189. datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
  190. p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
  191. t2= time_synchronized()
  192. #print('###line138:',timeOut,outSource,outVideoDir)
  193. ##每隔 fpsample帧处理一次,如果有问题就保存图片
  194. if (iframe % fpsample == 0) and (len(post_results)>0) :
  195. parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile)
  196. post_results=[]
  197. if len(p_result[2] )>0: ##
  198. post_results.append(p_result)
  199. t3= time_synchronized()
  200. image_array = p_result[1]
  201. if outSource!='NO':
  202. ppipe.stdin.write(image_array.tobytes())
  203. if (outVideoDir!='NO'):
  204. if video_flag: ret = vid_writer.write(image_array)
  205. else:
  206. time_w0=time.time()
  207. ret = vid_writer_AI.write(image_array)
  208. ret = vid_writer_OR.write(im0s[0])
  209. time_w1=time.time()
  210. #if not ret:
  211. # print('\n write two videos time:%f ms'%(time_w1-time_w0)*1000,ret)
  212. t4= time_synchronized()
  213. timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  214. if iframe%100==0:
  215. outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000)
  216. #wrtiteLog(fp_log,outstr);
  217. writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger,printFlag=False)
  218. #print(outstr)
  219. t00 = t4;
  220. except Exception as e:
  221. #if outSource:###推流才有如下
  222. streamCheckCnt+=1;taskEnd=False
  223. if streamCheckCnt==1:timeBreak0=time.time();time_kafka0 = time.time()
  224. timeBreak1=time.time();
  225. if timeBreak1-timeBreak0 >5 and Pushed_Flag:###流断开5秒后,要关闭推流
  226. ppipe.kill();Pushed_Flag=False
  227. writeELK_log(msg='stream pip is killed ',fp=fp_log,line=sys._getframe().f_lineno,logger=logger)
  228. ###读接口,看看任务有没有结束
  229. ChanellInfos,taskEnd=query_channel_status(channelIndex)
  230. ####taskEnd######################DEBUG
  231. #taskEnd=False
  232. if timeBreak1-timeBreak0 >StreamRecoveringTime : ##默认30分钟内,流没有恢复的话,就断开。
  233. taskEnd=True
  234. outstr_channel='%s ,taskEnd:%s'%(ChanellInfos,taskEnd)
  235. writeELK_log(msg=outstr_channel,fp=fp_log,line=sys._getframe().f_lineno,logger=logger)
  236. if outSource == 'NO':#离线没有推流
  237. taskEnd=True
  238. if taskEnd:
  239. if timeBreak1-timeBreak0 > 60:###超时结束
  240. writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='超时结束')
  241. else:
  242. writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束')
  243. if (outVideoDir!='NO'):
  244. if video_flag:vid_writer.release()
  245. else:
  246. vid_writer_OR.release();
  247. vid_writer_AI.release();
  248. outstr='Task ends:%.1f , msgid:%s,taskID:%s '%(timeBreak1-timeBreak0,taskId,msgId)
  249. writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger)
  250. break
  251. ##执行到这里的一定是在线任务,在等待流的过程中要发送waiting
  252. time_kafka1 = time.time()
  253. if time_kafka1-time_kafka0>60:
  254. msg_res = copy.deepcopy(msg_dict_off);
  255. msg_res['msg_id']= msgId; msg_res['type']=2
  256. msg_res = json.dumps(msg_res, ensure_ascii=False)
  257. outStrList= get_infos(taskId, msgId,msg_res,key_str='Waiting stream restoring heartbeat')
  258. send_kafka(producer,kafka_par,msg_res,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  259. outstr='Waiting stream recovering:%.1f s'%(timeBreak1-timeBreak0)
  260. writeELK_log(msg=outstr,fp=fp_log,line=sys._getframe().f_lineno,logger=logger)
  261. writeELK_log(msg=outstr_channel,fp=fp_log,line=sys._getframe().f_lineno,logger=logger)
  262. time_kafka0 = time_kafka1
  263. #break###断流或者到终点
  264. time.sleep(5)
  265. print('Waiting stream for ',e)
  266. def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par,channelIndex='LC001'):
  267. if outSource=='NO':
  268. streamName='off-%s-%s'%(taskId,msgId)
  269. else:
  270. streamName='live-%s-%s'%(taskId,msgId)
  271. dataPar ={
  272. 'imgData':'',
  273. 'imgName':'testW',
  274. 'streamName':streamName,
  275. 'taskId':taskId,
  276. 'msgId':msgId,
  277. 'channelIndex':channelIndex,
  278. 'device':str(gpuid),
  279. 'modelJson':modelJson,
  280. 'kafka_par':kafka_par,
  281. }
  282. #dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None
  283. dataPar['inSource'] = inSource;dataPar['outSource'] = outSource
  284. process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid
  285. parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn
  286. gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,))
  287. gpuProcess.start()
  288. #print(dir(gpuProcess))
  289. #child_return = parent_conn.recv()
  290. #timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  291. #print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return)
  292. return gpuProcess
  293. msg_dict_offline = {
  294. "biz_id":"hehuzhang",
  295. "mod_id":"ai",
  296. "msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
  297. "offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
  298. "offering_type":"mp4",
  299. "results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
  300. 'outSource':'NO'
  301. }
  302. taskStatus={}
  303. taskStatus['onLine'] = Queue(100)
  304. taskStatus['offLine']= Queue(100)
  305. taskStatus['pidInfos']= {}
  306. def get_msg_from_kafka(par):
  307. thread='master:readingKafka'
  308. outStrList={}
  309. fp_log = par['fp_log']
  310. logger=par['logger']
  311. consumer = KafkaConsumer(bootstrap_servers=par['server'],client_id='AI_server',group_id=par['group_id'],auto_offset_reset='latest')
  312. consumer.subscribe( par['topic'][0:2])
  313. outstr='reading kafka process starts'
  314. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  315. kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
  316. producer = KafkaProducer(
  317. bootstrap_servers=par['server'],#tencent yun
  318. value_serializer=lambda v: v.encode('utf-8'),
  319. metadata_max_age_ms=120000)
  320. for ii,msg in enumerate(consumer):
  321. ##读取消息
  322. try:
  323. taskInfos = eval(msg.value.decode('utf-8') )
  324. except:
  325. outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic)
  326. continue
  327. if msg.topic == par['topic'][0]: ##
  328. taskInfos['inSource']= taskInfos['pull_channel'];
  329. taskInfos['outSource']= get_push_address(taskInfos['push_channel']) ;
  330. taskStatus['onLine'].put( taskInfos )
  331. save_message(par['kafka'],taskInfos)
  332. ###发送状态信息waiting
  333. msg = create_status_msg(msg_dict_on,taskInfos,sts='waiting')
  334. outStrList=get_infos(taskInfos['results_base_dir'], taskInfos['msg_id'],msg,key_str='read msgs from kafka online task and response to kafka')
  335. send_kafka(producer,kafka_par,msg,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread);
  336. else:
  337. try:
  338. taskInfos['inSource']= taskInfos['offering_id'];
  339. taskInfos['outSource']= 'NO'
  340. taskStatus['offLine'].put( taskInfos )
  341. save_message(par['kafka'],taskInfos)
  342. ###发送状态信息waiting
  343. msg = create_status_msg(msg_dict_off,taskInfos,sts='waiting')
  344. outStrList=get_infos(taskInfos['results_base_dir'], taskInfos['msg_id'],msg,key_str='read msgs from kafka offline task and response to kafka')
  345. send_kafka(producer,kafka_par,msg,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  346. except Exception as e:
  347. print('######msg Error######',msg,e)
  348. def detector(par):
  349. ####初始化信息列表
  350. kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
  351. producer = KafkaProducer(
  352. bootstrap_servers=par['server'],#tencent yun
  353. value_serializer=lambda v: v.encode('utf-8'),
  354. metadata_max_age_ms=120000)
  355. time_interval=par['logPrintInterval']
  356. logname='detector.log';thread='master:detector'
  357. fp_log=create_logFile(logdir=par['logDir'],name=logname)
  358. ##准备日志函数所需参数
  359. logger=par['logDir'].replace('/','.')+'.'+logname
  360. #wrtiteLog(fp_log,'########### detector process starts ######\n');
  361. outstr='detector process starts';sys._getframe().f_lineno
  362. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  363. ###开启kafka consumer 进程##
  364. parIn=copy.deepcopy(par);parIn['fp_log']=fp_log ;parIn['logger']=logger
  365. HeartProcess=Process(target=get_msg_from_kafka,name='process-consumer-kafka',args=(parIn,))
  366. HeartProcess.start()
  367. timeSleep=1
  368. time0=time.time()
  369. time0_kafQuery=time.time()
  370. time0_taskQuery=time.time()
  371. time0_sleep=time.time()
  372. outStrList={}
  373. while True:###每隔timeSleep秒,轮询一次
  374. time0_taskQuery,printFlag = check_time_interval(time0_taskQuery,time_interval)
  375. outstr_task= ' task queue onLine cnt:%d offLine:%d'%(taskStatus['onLine'].qsize(), taskStatus['offLine'].qsize())
  376. if (taskStatus['onLine'].qsize()>0) or (taskStatus['offLine'].qsize()>0):
  377. #outstr_task=wrtiteLog(fp_log,outstr_task);print( outstr_task);
  378. writeELK_log(msg=outstr_task,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  379. ##2-更新显卡信息
  380. gpuStatus = getGPUInfos()
  381. ##3-优先考虑在线任务
  382. if not taskStatus['onLine'].empty():
  383. ###3.1-先判断有没有空闲显卡:
  384. cuda = get_available_gpu(gpuStatus)
  385. ###获取在线任务信息,并执行,lauch process
  386. taskInfos = taskStatus['onLine'].get()
  387. outstr='start to process onLine taskId:%s msgId:%s'%( taskInfos['results_base_dir'],taskInfos['msg_id'] )
  388. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  389. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  390. if cuda: ###3.1.1 -有空余显卡
  391. #lauch process
  392. msg= copy.deepcopy(msg_dict_on);
  393. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code'])
  394. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
  395. else:###3.1.2-没有显卡
  396. ##判断有没有显卡上面都是离线进程的
  397. cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos'])
  398. print('###line478:',cuda_pid)
  399. if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程
  400. cuda = cuda_pid['cuda']
  401. pids = cuda_pid['pids']
  402. ##kill 离线进程,并更新离线任务表
  403. cnt_off_0 = taskStatus['offLine'].qsize()
  404. for pid in pids:
  405. ##kill 离线进程
  406. taskStatus['pidInfos'][pid]['gpuProcess'].kill()
  407. ##更新离线任务表
  408. taskStatus['offLine'].put( taskStatus['pidInfos'][pid]['taskInfos'] )
  409. taskInfos_off=taskStatus['pidInfos'][pid]['taskInfos']
  410. ##发送离线数据,说明状态变成waiting
  411. msg= msg_dict_off;
  412. msg=update_json(taskInfos_off,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
  413. msg['results'][0]['original_url']=taskInfos_off['inSource']
  414. msg['results'][0]['sign_url']=get_boradcast_address(taskInfos_off['outSource'])
  415. msg['status']='waiting'
  416. msg = json.dumps(msg, ensure_ascii=False)
  417. outStrList=get_infos(taskInfos_off['results_base_dir'], taskInfos_off['msg_id'],msg,key_str='start online task after kill offline tasks')
  418. send_kafka(producer,kafka_par,msg,outStrList,fp_log ,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  419. cnt_off_1 = taskStatus['offLine'].qsize()
  420. outstr='before killing process, offtask cnt:%d ,after killing, offtask cnt:%d '%(cnt_off_0,cnt_off_1)
  421. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  422. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  423. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code'])
  424. ###更新pidinfos,update pidInfos
  425. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
  426. else:
  427. outstr='No available GPUs for onLine task'
  428. #outstr=wrtiteLog(fp_log,outstr);print(outstr);
  429. writeELK_log(msg=outstr,fp=fp_log,level='ERROR',thread=thread,line=sys._getframe().f_lineno,logger=logger)
  430. ##4-更新显卡信息
  431. gpuStatus = getGPUInfos()
  432. ##5-考虑离线任务
  433. if not taskStatus['offLine'].empty():
  434. cudaArrange= arrange_offlineProcess(gpuStatus,taskStatus['pidInfos'],modelMemory=1500)
  435. outstr='IN OFF LINE TASKS available cudas:%s'%(cudaArrange)
  436. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  437. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  438. for cuda in cudaArrange:
  439. if not taskStatus['offLine'].empty():
  440. taskInfos = taskStatus['offLine'].get()
  441. outstr='start to process offLine taskId:%s msgId:%s'%( taskInfos['results_base_dir'],taskInfos['msg_id'] )
  442. taskInfos['channel_code']='LC999'###离线消息没有这个参数
  443. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  444. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  445. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par,taskInfos['channel_code'])
  446. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'offLine','taskInfos':taskInfos}
  447. if get_whether_gpuProcess():
  448. time0_sleep,printFlag = check_time_interval(time0_sleep,time_interval)
  449. if printFlag:
  450. outstr= '*'*20 +'sleep '+'*'*20;
  451. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  452. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=sys._getframe().f_lineno,logger=logger)
  453. time.sleep(timeSleep)
  454. ####检查gpu子进程是否结束,如果结束要join(),否则会产生僵尸进程###
  455. #taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
  456. for key in list(taskStatus['pidInfos'].keys()):
  457. if not taskStatus['pidInfos'][key]['gpuProcess'].is_alive():
  458. taskStatus['pidInfos'][key]['gpuProcess'].join()
  459. taskStatus['pidInfos'].pop(key)
  460. print('########Program End#####')
  461. if __name__ == '__main__':
  462. par={};
  463. ###topic0--在线,topic1--离线
  464. #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
  465. #101.132.127.1:19092
  466. '''
  467. par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test';
  468. par['kafka']='mintors/kafka'
  469. par['modelJson']='conf/model.json'
  470. '''
  471. masterFile="conf/master.json"
  472. assert os.path.exists(masterFile)
  473. with open(masterFile,'r') as fp:
  474. data=json.load(fp)
  475. par=data['par']
  476. print(par)
  477. detector(par)