用kafka接收消息
選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

master_0509.py 26KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. import numpy as np
  2. import time,ast,copy
  3. from flask import request, Flask,jsonify
  4. import base64,cv2,os,sys,json
  5. sys.path.extend(['../yolov5'])
  6. #from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp
  7. from segutils.segmodel import SegModel,get_largest_contours
  8. from models.experimental import attempt_load
  9. from utils.datasets import LoadStreams, LoadImages
  10. from utils.torch_utils import select_device, load_classifier, time_synchronized
  11. from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
  12. import subprocess as sp
  13. import matplotlib.pyplot as plt
  14. import torch,random,string
  15. import multiprocessing
  16. from multiprocessing import Process,Queue
  17. import traceback
  18. from kafka import KafkaProducer, KafkaConsumer,TopicPartition
  19. from kafka.errors import kafka_errors
  20. #torch.multiprocessing.set_start_method('spawn')
  21. import utilsK
  22. from utilsK.GPUtils import *
  23. from utilsK.masterUtils import *
  24. from utilsK.sendUtils import create_status_msg,update_json
  25. #from utilsK.modelEval import onlineModelProcess
  26. import random,string
  27. from Send_tranfer_oss import msg_dict_on,msg_dict_off
  28. import pykafka
  29. from pykafka import KafkaClient
  30. process_id=0
  31. def onlineModelProcess(parIn ):
  32. DEBUG=False
  33. streamName = parIn['streamName']
  34. childCallback=parIn['callback']
  35. outStrList={}
  36. #try:
  37. for wan in ['test']:
  38. jsonfile=parIn['modelJson']
  39. with open(jsonfile,'r') as fp:
  40. parAll = json.load(fp)
  41. Detweights=parAll['gpu_process']['det_weights']
  42. seg_nclass = parAll['gpu_process']['seg_nclass']
  43. Segweights = parAll['gpu_process']['seg_weights']
  44. videoSave = parAll['AI_video_save']
  45. imageTxtFile = parAll['imageTxtFile']
  46. taskId,msgId = streamName.split('-')[1:3]
  47. inSource,outSource=parIn['inSource'],parIn['outSource']
  48. ##构建日志文件
  49. if outSource != 'NO':
  50. logdir = parAll['logChildProcessOnline']
  51. waitingTime=parAll['StreamWaitingTime']
  52. else:
  53. logdir = parAll['logChildProcessOffline']
  54. waitingTime=5
  55. fp_log=create_logFile(logdir=logdir)
  56. kafka_par=parIn['kafka_par']
  57. producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000)
  58. ####要先检查视频的有效性
  59. ###开始的时候,如果在线任务没有流,要发送的心跳消息,msg_h,
  60. msg_h= copy.deepcopy(msg_dict_off);
  61. msg_h['status']='waiting';msg_h['msg_id']=msgId
  62. if outSource == 'NO':
  63. msg_h['type']=1
  64. Stream_ok= get_fps_rtmp(inSource,video=True)
  65. else:
  66. msg_h['type']=2
  67. msg_h_d = json.dumps(msg_h, ensure_ascii=False)
  68. outStrList['success']= '%s waiting stream or video, send heartbeat: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
  69. outStrList['failure']='#######kafka ERROR waiting stream or video, send heartbeat'
  70. outStrList['Refailure']='##############kafka ERROR waiting stream or video, Re-send heartbeat'
  71. Stream_ok=check_stream(inSource,producer,kafka_par,msg_h_d,outStrList,fp_log ,timeMs=waitingTime)
  72. if Stream_ok:###发送开始信号
  73. msg_h['status']='running'
  74. msg_h_d = json.dumps(msg_h, ensure_ascii=False)
  75. outStrList['success']= '%s informing stream/video is ok, taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
  76. outStrList['failure']='#######kafka ERROR ,when informing stream/video is ok'
  77. outStrList['Refailure']='##############kafka ERROR, when re-informing stream/video is ok'
  78. send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
  79. else:
  80. ####检测离线视频是否有效,无效要报错
  81. outstr='############# offline vedio or live stream Error:%s #################'%(inSource)
  82. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  83. msg_h['error']=str(1001);msg_h['status']='failed';
  84. msg_h_d = json.dumps(msg_h, ensure_ascii=False);
  85. outStrList['success']= '%s informing invaid video or stream success : taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
  86. outStrList['failure']='#######kafka ERROR, when informing invaid video or stream'
  87. outStrList['Refailure']='##############kafka ERROR,when re-informing invaid video or stream'
  88. send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
  89. childCallback.send(' offline vedio or live stream Error')
  90. continue
  91. if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')):
  92. fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[0:4]
  93. else:
  94. fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[0:4]
  95. fps = int(fps+0.5)
  96. if outSource != 'NO':
  97. command=['ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24',
  98. '-s', "{}x{}".format(outW,outH),# 图片分辨率
  99. '-r', str(fps),# 视频帧率
  100. '-i', '-','-c:v', 'libx264','-pix_fmt', 'yuv420p',
  101. '-f', 'flv',outSource
  102. ]
  103. video_flag = videoSave['onLine']
  104. logdir = parAll['logChildProcessOnline']
  105. waitingTime=parAll['StreamWaitingTime']
  106. else:
  107. video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline']
  108. waitingTime=5
  109. fp_log=create_logFile(logdir=logdir)
  110. device = select_device(parIn['device'])
  111. half = device.type != 'cpu' # half precision only supported on CUDA
  112. model = attempt_load(Detweights, map_location=device) # load FP32 model
  113. if half: model.half()
  114. segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device)
  115. # 管道配置,其中用到管道
  116. if outSource !='NO' :
  117. ppipe = sp.Popen(command, stdin=sp.PIPE)
  118. ##后处理参数
  119. par=parAll['post_process']
  120. conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
  121. outImaDir = par['outImaDir']
  122. outVideoDir = par['outVideoDir']
  123. labelnames=par['labelnames']
  124. rainbows=par['rainbows']
  125. fpsample = par['fpsample']
  126. names=get_labelnames(labelnames)
  127. label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
  128. dataset = LoadStreams(inSource, img_size=640, stride=32)
  129. childCallback.send('####model load success####')
  130. if (outVideoDir!='NO') and video_flag:
  131. msg_id = streamName.split('-')[2]
  132. save_path = os.path.join(outVideoDir,msg_id+'.MP4')
  133. vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
  134. iframe = 0;post_results=[];time_beg=time.time()
  135. t00=time.time()
  136. time_kafka0=time.time()
  137. for path, img, im0s, vid_cap in dataset:
  138. t0= time_synchronized()
  139. if not path:
  140. EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_AI.jpg'%(outImaDir,time_str(),streamName)
  141. EndUrl = EndUrl.replace(' ','-').replace(':','-')
  142. img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
  143. if imageTxtFile:
  144. EndUrl_txt = EndUrl.replace('.jpg','.txt')
  145. fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
  146. EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_OR.jpg'%(outImaDir,time_str(),streamName)
  147. EndUrl = EndUrl.replace(' ','-').replace(':','-')
  148. ret = cv2.imwrite(EndUrl,img_end)
  149. if imageTxtFile:
  150. EndUrl_txt = EndUrl.replace('.jpg','.txt')
  151. fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
  152. #print(EndUrl,ret)
  153. childCallback.send('####strem ends####')
  154. if (outVideoDir!='NO') and video_flag:
  155. vid_writer.release()
  156. break###断流或者到终点
  157. if outSource == 'NO':###如果不推流,则显示进度条
  158. view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] )
  159. ###直播和离线都是1分钟发一次消息。直播发
  160. time_kafka1 = time.time()
  161. if time_kafka1 - time_kafka0 >60:
  162. time_kafka0 = time_kafka1
  163. ###发送状态信息waiting
  164. msg = copy.deepcopy(msg_dict_off);
  165. msg['msg_id']= msgId; msg
  166. if outSource == 'NO':
  167. msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt)
  168. msg['type']=1
  169. else:
  170. msg['progressbarOn']= str(iframe)
  171. msg['type']=2
  172. msg = json.dumps(msg, ensure_ascii=False)
  173. '''
  174. try:
  175. record_metadata = producer.send(kafka_par['topic'], msg).get()
  176. outstr='%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
  177. wrtiteLog(fp_log,outstr);print( outstr);
  178. except Exception as e:
  179. outstr='#######kafka ERROR when processing sending progressbar or heartBeat:, error: %s'%(str(e))
  180. wrtiteLog(fp_log,outstr);print( outstr);
  181. try:
  182. producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
  183. future = producer.send(par['topic'][2], msg).get()
  184. except Exception as e:
  185. outstr='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
  186. wrtiteLog(fp_log,outstr);print( outstr);
  187. '''
  188. ###发送状态信息waiting
  189. outStrList['success']= '%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
  190. outStrList['failure']='#######kafka ERROR when processing sending progressbar or heartBeat'
  191. outStrList['Refailure']='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
  192. send_kafka(producer,kafka_par,msg,outStrList,fp_log );
  193. time0=time.time()
  194. iframe +=1
  195. time1=time.time()
  196. img = torch.from_numpy(img).to(device)
  197. img = img.half() if half else img.float() # uint8 to fp16/32
  198. img /= 255.0 # 0 - 255 to 0.0 - 1.0
  199. timeseg0 = time.time()
  200. seg_pred,segstr = segmodel.eval(im0s[0] )
  201. timeseg1 = time.time()
  202. t1= time_synchronized()
  203. pred = model(img,augment=False)[0]
  204. time4 = time.time()
  205. datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
  206. p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
  207. t2= time_synchronized()
  208. #print('###line138:',timeOut,outSource,outVideoDir)
  209. ##每隔 fpsample帧处理一次,如果有问题就保存图片
  210. if (iframe % fpsample == 0) and (len(post_results)>0) :
  211. parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile)
  212. post_results=[]
  213. if len(p_result[2] )>0: ##
  214. post_results.append(p_result)
  215. t3= time_synchronized()
  216. image_array = p_result[1]
  217. if outSource!='NO':
  218. ppipe.stdin.write(image_array.tobytes())
  219. if (outVideoDir!='NO') and video_flag:
  220. ret = vid_writer.write(image_array)
  221. t4= time_synchronized()
  222. timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  223. if iframe%100==0:
  224. outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000)
  225. wrtiteLog(fp_log,outstr);
  226. #print(outstr)
  227. t00 = t4;
  228. ##模型加载之类的错误
  229. #except Exception as e:
  230. # print(time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) ,'*'*20,'###line177 ERROR:',e)
  231. # childCallback.send(e) #将异常通过管道送出
  232. def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par):
  233. if outSource=='NO':
  234. streamName='off-%s-%s'%(taskId,msgId)
  235. else:
  236. streamName='live-%s-%s'%(taskId,msgId)
  237. dataPar ={
  238. 'imgData':'',
  239. 'imgName':'testW',
  240. 'streamName':streamName,
  241. 'taskId':taskId,
  242. 'msgId':msgId,
  243. 'device':str(gpuid),
  244. 'modelJson':modelJson,
  245. 'kafka_par':kafka_par,
  246. }
  247. #dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None
  248. dataPar['inSource'] = inSource;dataPar['outSource'] = outSource
  249. process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid
  250. parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn
  251. gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,))
  252. gpuProcess.start()
  253. #print(dir(gpuProcess))
  254. #child_return = parent_conn.recv()
  255. #timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  256. #print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return)
  257. return gpuProcess
  258. msg_dict_offline = {
  259. "biz_id":"hehuzhang",
  260. "mod_id":"ai",
  261. "msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
  262. "offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
  263. "offering_type":"mp4",
  264. "results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
  265. 'outSource':'NO'
  266. }
  267. def detector(par):
  268. ####初始化信息列表
  269. consumer = KafkaConsumer(bootstrap_servers=par['server'],client_id='AI_server',group_id=par['group_id'],auto_offset_reset='earliest')
  270. consumer.subscribe( par['topic'][0:2])
  271. '''
  272. client = KafkaClient(hosts=par['server'])
  273. consumer_pys=[]
  274. for topic_name in par['topic'][0:2]:
  275. consumer_pys.append(client.topics[ topic_name ].get_simple_consumer(consumer_group=par['group_id'],timeout=30))
  276. '''
  277. kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
  278. producer = KafkaProducer(
  279. bootstrap_servers=par['server'],#tencent yun
  280. value_serializer=lambda v: v.encode('utf-8'),
  281. metadata_max_age_ms=120000)
  282. taskStatus={}
  283. taskStatus['onLine'] = Queue(100)
  284. taskStatus['offLine']= Queue(100)
  285. taskStatus['pidInfos']= {}
  286. fp_log=create_logFile(logdir=par['logDir'])
  287. wrtiteLog(fp_log,'###########masster starts in line222######\n')
  288. timeSleep=1
  289. #taskStatus['pidInfos'][31897]={'gpuProcess':'onlineProcess','type':'onLine'}
  290. time0=time.time()
  291. time0_kafQuery=time.time()
  292. time0_taskQuery=time.time()
  293. time0_sleep=time.time()
  294. time_interval=10; outStrList={}
  295. while True:###每隔timeSleep秒,轮询一次
  296. #for isleep in range(1):
  297. '''
  298. ##1-读取kafka,更新任务类别
  299. try:
  300. msgs = getAllRecords(consumer,par['topic'])
  301. except Exception as e:
  302. outstr='%s kafka connecting error:%s '%('#'*20,e)
  303. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  304. time.sleep(timeSleep)
  305. continue
  306. #if get_whether_gpuProcess():
  307. time0_kafQuery,printFlag = check_time_interval(time0_kafQuery,time_interval)
  308. if printFlag:
  309. outstr_kafka=' ##### kafka Left %d records####'%(len(msgs));
  310. outstr_kafka=wrtiteLog(fp_log,outstr_kafka)
  311. '''
  312. for ii,msg in enumerate(consumer):
  313. #for ii,msg in enumerate(msgs):
  314. ##读取消息
  315. try:
  316. taskInfos = eval(msg.value.decode('utf-8') )
  317. except:
  318. outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic)
  319. continue
  320. if msg.topic == par['topic'][0]: ##
  321. taskInfos['inSource']= taskInfos['pull_channel'];
  322. taskInfos['outSource']= get_push_address(taskInfos['push_channel']) ;
  323. taskStatus['onLine'].put( taskInfos )
  324. save_message(par['kafka'],taskInfos)
  325. ###发送状态信息waiting
  326. msg = create_status_msg(msg_dict_on,taskInfos,sts='waiting')
  327. outStrList['success']= '%s read from kafka online task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
  328. outStrList['failure']='#######kafka ERROR when read from kafka online task and back to kafka'
  329. outStrList['Refailure']='##############kafka ERROR when read from kafka online task and resend back to kafka:'
  330. send_kafka(producer,kafka_par,msg,outStrList,fp_log );
  331. else:
  332. taskInfos['inSource']= taskInfos['offering_id'];
  333. taskInfos['outSource']= 'NO'
  334. taskStatus['offLine'].put( taskInfos )
  335. save_message(par['kafka'],taskInfos)
  336. ###发送状态信息waiting
  337. msg = create_status_msg(msg_dict_off,taskInfos,sts='waiting')
  338. outStrList['success']= '%s read from kafka offline task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
  339. outStrList['failure']='#######kkafka ERROR when read from kafka offline task and back to kafka:,'
  340. outStrList['Refailure']='##############kafka ERROR when read from kafka offline task and resend back to kafka:'
  341. send_kafka(producer,kafka_par,msg,outStrList,fp_log );
  342. time0_taskQuery,printFlag = check_time_interval(time0_taskQuery,time_interval)
  343. outstr_task= ' task queue onLine cnt:%d offLine:%d'%(taskStatus['onLine'].qsize(), taskStatus['offLine'].qsize())
  344. ##2-更新显卡信息
  345. gpuStatus = getGPUInfos()
  346. ##3-优先考虑在线任务
  347. if not taskStatus['onLine'].empty():
  348. ###3.1-先判断有没有空闲显卡:
  349. cuda = get_available_gpu(gpuStatus)
  350. ###获取在线任务信息,并执行,lauch process
  351. taskInfos = taskStatus['onLine'].get()
  352. print('################396',cuda)
  353. if cuda: ###3.1.1 -有空余显卡
  354. #lauch process
  355. msg= copy.deepcopy(msg_dict_on);
  356. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
  357. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
  358. else:###3.1.2-没有显卡
  359. ##判断有没有显卡上面都是离线进程的
  360. cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos'])
  361. if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程
  362. cuda = cuda_pid['cuda']
  363. pids = cuda_pid['pids']
  364. ##kill 离线进程,并更新离线任务表
  365. cnt_off_0 = taskStatus['offLine'].qsize()
  366. for pid in pids:
  367. ##kill 离线进程
  368. taskStatus['pidInfos'][pid]['gpuProcess'].kill()
  369. ##更新离线任务表
  370. taskStatus['offLine'].put( taskStatus['pidInfos'][pid]['taskInfos'] )
  371. taskInfos_off=taskStatus['pidInfos'][pid]['taskInfos']
  372. ##发送离线数据,说明状态变成waiting
  373. msg= msg_dict_off;
  374. msg=update_json(taskInfos_off,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
  375. msg['results'][0]['original_url']=taskInfos_off['inSource']
  376. msg['results'][0]['sign_url']=get_boradcast_address(taskInfos_off['outSource'])
  377. msg['status']='waiting'
  378. msg = json.dumps(msg, ensure_ascii=False)
  379. outStrList['success']= '%s start online task after kill offline tasks and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos_off['results_base_dir'], taskInfos_off['msg_id'],msg)
  380. outStrList['failure']='#######kafka ERROR when start online task after kill offline tasks and back to kafka'
  381. outStrList['Refailure']='##############kkafka ERROR when start online task after kill offline tasks and back to kafka'
  382. send_kafka(producer,kafka_par,msg,outStrList,fp_log );
  383. cnt_off_1 = taskStatus['offLine'].qsize()
  384. outstr='%s before killing process, offtask cnt:%d ,after killing, offtask cnt:%d %s'%('-'*20 ,cnt_off_0,cnt_off_1,'*'*20)
  385. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  386. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
  387. ###更新pidinfos,update pidInfos
  388. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
  389. else:
  390. outstr='######No available GPUs for onLine####'
  391. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  392. ##4-更新显卡信息
  393. gpuStatus = getGPUInfos()
  394. ##5-考虑离线任务
  395. if not taskStatus['offLine'].empty():
  396. cudaArrange= arrange_offlineProcess(gpuStatus,taskStatus['pidInfos'],modelMemory=1500)
  397. outstr='###line342 IN OFF LINE TASKS available cudas:%s'%(cudaArrange)
  398. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  399. for cuda in cudaArrange:
  400. if not taskStatus['offLine'].empty():
  401. taskInfos = taskStatus['offLine'].get()
  402. gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
  403. taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'offLine','taskInfos':taskInfos}
  404. if get_whether_gpuProcess():
  405. time0_sleep,printFlag = check_time_interval(time0_sleep,time_interval)
  406. if printFlag:
  407. timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  408. outstr= timestr2 + '*'*20 +'sleep '+'*'*20;
  409. outstr=wrtiteLog(fp_log,outstr);print( outstr);
  410. outstr_task=wrtiteLog(fp_log,outstr_task);print( outstr_task);
  411. time.sleep(timeSleep)
  412. print('########sleep 1s #####')
  413. print('########Program End#####')
  414. if __name__ == '__main__':
  415. par={};
  416. ###topic0--在线,topic1--离线
  417. #par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
  418. #101.132.127.1:19092
  419. '''
  420. par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test';
  421. par['kafka']='mintors/kafka'
  422. par['modelJson']='conf/model.json'
  423. '''
  424. masterFile="conf/master.json"
  425. assert os.path.exists(masterFile)
  426. with open(masterFile,'r') as fp:
  427. data=json.load(fp)
  428. par=data['par']
  429. print(par)
  430. detector(par)