DSPRiverInspection/code_bak/master_0508.py

646 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import time,ast,copy
from flask import request, Flask,jsonify
import base64,cv2,os,sys,json
sys.path.extend(['../yolov5'])
#from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp
from segutils.segmodel import SegModel,get_largest_contours
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.torch_utils import select_device, load_classifier, time_synchronized
from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
import subprocess as sp
import matplotlib.pyplot as plt
import torch,random,string
import multiprocessing
from multiprocessing import Process,Queue
import traceback
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
#torch.multiprocessing.set_start_method('spawn')
import utilsK
from utilsK.GPUtils import *
from utilsK.masterUtils import *
from utilsK.sendUtils import create_status_msg,update_json
#from utilsK.modelEval import onlineModelProcess
import random,string
from Send_tranfer_oss import msg_dict_on,msg_dict_off
import pykafka
from pykafka import KafkaClient
process_id=0
def onlineModelProcess(parIn ):
DEBUG=False
streamName = parIn['streamName']
childCallback=parIn['callback']
outStrList={}
#try:
for wan in ['test']:
jsonfile=parIn['modelJson']
with open(jsonfile,'r') as fp:
parAll = json.load(fp)
Detweights=parAll['gpu_process']['det_weights']
seg_nclass = parAll['gpu_process']['seg_nclass']
Segweights = parAll['gpu_process']['seg_weights']
videoSave = parAll['AI_video_save']
imageTxtFile = parAll['imageTxtFile']
taskId,msgId = streamName.split('-')[1:3]
inSource,outSource=parIn['inSource'],parIn['outSource']
##构建日志文件
if outSource != 'NO':
logdir = parAll['logChildProcessOnline']
waitingTime=parAll['StreamWaitingTime']
else:
logdir = parAll['logChildProcessOffline']
waitingTime=5
fp_log=create_logFile(logdir=logdir)
kafka_par=parIn['kafka_par']
producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000)
####要先检查视频的有效性
###开始的时候,如果在线任务没有流要发送的心跳消息msg_h,
msg_h= copy.deepcopy(msg_dict_off);
msg_h['status']='waiting';msg_h['msg_id']=msgId
if outSource == 'NO':
msg_h['type']=1
Stream_ok= get_fps_rtmp(inSource,video=True)
else:
msg_h['type']=2
msg_h_d = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s waiting stream or video, send heartbeat: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR waiting stream or video, send heartbeat'
outStrList['Refailure']='##############kafka ERROR waiting stream or video, Re-send heartbeat'
Stream_ok=check_stream(inSource,producer,kafka_par,msg_h_d,outStrList,fp_log ,timeMs=waitingTime)
if Stream_ok:###发送开始信号
msg_h['status']='running'
msg_h_d = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s informing stream/video is ok, taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR ,when informing stream/video is ok'
outStrList['Refailure']='##############kafka ERROR, when re-informing stream/video is ok'
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
else:
####检测离线视频是否有效,无效要报错
outstr='############# offline vedio or live stream Error:%s #################'%(inSource)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
msg_h['error']=str(1001);msg_h['status']='failed';
msg_h_d = json.dumps(msg_h, ensure_ascii=False);
outStrList['success']= '%s informing invaid video or stream success : taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR, when informing invaid video or stream'
outStrList['Refailure']='##############kafka ERROR,when re-informing invaid video or stream'
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
childCallback.send(' offline vedio or live stream Error')
continue
if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')):
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[0:4]
else:
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[0:4]
fps = int(fps+0.5)
if outSource != 'NO':
command=['ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24',
'-s', "{}x{}".format(outW,outH),# 图片分辨率
'-r', str(fps),# 视频帧率
'-i', '-','-c:v', 'libx264','-pix_fmt', 'yuv420p',
'-f', 'flv',outSource
]
video_flag = videoSave['onLine']
logdir = parAll['logChildProcessOnline']
waitingTime=parAll['StreamWaitingTime']
else:
video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline']
waitingTime=5
fp_log=create_logFile(logdir=logdir)
device = select_device(parIn['device'])
half = device.type != 'cpu' # half precision only supported on CUDA
model = attempt_load(Detweights, map_location=device) # load FP32 model
if half: model.half()
segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device)
# 管道配置,其中用到管道
if outSource !='NO' :
ppipe = sp.Popen(command, stdin=sp.PIPE)
##后处理参数
par=parAll['post_process']
conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
outImaDir = par['outImaDir']
outVideoDir = par['outVideoDir']
labelnames=par['labelnames']
rainbows=par['rainbows']
fpsample = par['fpsample']
names=get_labelnames(labelnames)
label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
dataset = LoadStreams(inSource, img_size=640, stride=32)
childCallback.send('####model load success####')
if (outVideoDir!='NO') and video_flag:
msg_id = streamName.split('-')[2]
save_path = os.path.join(outVideoDir,msg_id+'.MP4')
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
iframe = 0;post_results=[];time_beg=time.time()
t00=time.time()
time_kafka0=time.time()
for path, img, im0s, vid_cap in dataset:
t0= time_synchronized()
if not path:
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_AI.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_OR.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
ret = cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
#print(EndUrl,ret)
childCallback.send('####strem ends####')
if (outVideoDir!='NO') and video_flag:
vid_writer.release()
break###断流或者到终点
if outSource == 'NO':###如果不推流,则显示进度条
view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] )
###直播和离线都是1分钟发一次消息。直播发
time_kafka1 = time.time()
if time_kafka1 - time_kafka0 >60:
time_kafka0 = time_kafka1
###发送状态信息waiting
msg = copy.deepcopy(msg_dict_off);
msg['msg_id']= msgId; msg
if outSource == 'NO':
msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt)
msg['type']=1
else:
msg['progressbarOn']= str(iframe)
msg['type']=2
msg = json.dumps(msg, ensure_ascii=False)
'''
try:
record_metadata = producer.send(kafka_par['topic'], msg).get()
outstr='%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
except Exception as e:
outstr='#######kafka ERROR when processing sending progressbar or heartBeat:, error: %s'%(str(e))
wrtiteLog(fp_log,outstr);print( outstr);
try:
producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
future = producer.send(par['topic'][2], msg).get()
except Exception as e:
outstr='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
'''
###发送状态信息waiting
outStrList['success']= '%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
outStrList['failure']='#######kafka ERROR when processing sending progressbar or heartBeat'
outStrList['Refailure']='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
time0=time.time()
iframe +=1
time1=time.time()
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
timeseg0 = time.time()
seg_pred,segstr = segmodel.eval(im0s[0] )
timeseg1 = time.time()
t1= time_synchronized()
pred = model(img,augment=False)[0]
time4 = time.time()
datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
t2= time_synchronized()
#print('###line138:',timeOut,outSource,outVideoDir)
##每隔 fpsample帧处理一次如果有问题就保存图片
if (iframe % fpsample == 0) and (len(post_results)>0) :
parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile)
post_results=[]
if len(p_result[2] )>0: ##
post_results.append(p_result)
t3= time_synchronized()
image_array = p_result[1]
if outSource!='NO':
ppipe.stdin.write(image_array.tobytes())
if (outVideoDir!='NO') and video_flag:
ret = vid_writer.write(image_array)
t4= time_synchronized()
timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
if iframe%100==0:
outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000)
wrtiteLog(fp_log,outstr);
#print(outstr)
t00 = t4;
##模型加载之类的错误
#except Exception as e:
# print(time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) ,'*'*20,'###line177 ERROR:',e)
# childCallback.send(e) #将异常通过管道送出
def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par):
if outSource=='NO':
streamName='off-%s-%s'%(taskId,msgId)
else:
streamName='live-%s-%s'%(taskId,msgId)
dataPar ={
'imgData':'',
'imgName':'testW',
'streamName':streamName,
'taskId':taskId,
'msgId':msgId,
'device':str(gpuid),
'modelJson':modelJson,
'kafka_par':kafka_par,
}
#dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None
dataPar['inSource'] = inSource;dataPar['outSource'] = outSource
process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid
parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn
gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,))
gpuProcess.start()
#print(dir(gpuProcess))
#child_return = parent_conn.recv()
#timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
#print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return)
return gpuProcess
msg_dict_offline = {
"biz_id":"hehuzhang",
"mod_id":"ai",
"msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
"offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
"offering_type":"mp4",
"results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
'outSource':'NO'
}
def detector(par):
####初始化信息列表
consumer = KafkaConsumer(bootstrap_servers=par['server'],client_id='AI_server',group_id=par['group_id'],auto_offset_reset='earliest')
consumer.subscribe( par['topic'][0:2])
'''
client = KafkaClient(hosts=par['server'])
consumer_pys=[]
for topic_name in par['topic'][0:2]:
consumer_pys.append(client.topics[ topic_name ].get_simple_consumer(consumer_group=par['group_id'],timeout=30))
'''
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
producer = KafkaProducer(
bootstrap_servers=par['server'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000)
taskStatus={}
taskStatus['onLine'] = Queue(100)
taskStatus['offLine']= Queue(100)
taskStatus['pidInfos']= {}
fp_log=create_logFile(logdir=par['logDir'])
wrtiteLog(fp_log,'###########masster starts in line222######\n')
timeSleep=1
#taskStatus['pidInfos'][31897]={'gpuProcess':'onlineProcess','type':'onLine'}
time0=time.time()
time0_kafQuery=time.time()
time0_taskQuery=time.time()
time0_sleep=time.time()
time_interval=10; outStrList={}
while True:###每隔timeSleep秒轮询一次
#for isleep in range(1):
##1-读取kafka更新任务类别
try:
'''
msgs=[]
for consumer in consumer_pys:
for msg in consumer:
if msg is None:break
else:msgs.append(msg)
'''
msgs = getAllRecords(consumer,par['topic'])
except Exception as e:
outstr='%s kafka connecting error:%s '%('#'*20,e)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
time.sleep(timeSleep)
continue
#if get_whether_gpuProcess():
time0_kafQuery,printFlag = check_time_interval(time0_kafQuery,time_interval)
if printFlag:
outstr_kafka=' ##### kafka Left %d records####'%(len(msgs));
outstr_kafka=wrtiteLog(fp_log,outstr_kafka)
for ii,msg in enumerate(msgs):
try:
#taskInfos = eval(json.loads(msg.value ))
taskInfos = eval(msg.value.decode('utf-8') )
except:
outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic)
continue
if msg.topic == par['topic'][0]: ##
taskInfos['inSource']= taskInfos['pull_channel'];
taskInfos['outSource']= get_push_address(taskInfos['push_channel']) ;
taskStatus['onLine'].put( taskInfos )
save_message(par['kafka'],taskInfos)
###发送状态信息waiting
msg = create_status_msg(msg_dict_on,taskInfos,sts='waiting')
outStrList['success']= '%s read from kafka online task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when read from kafka online task and back to kafka'
outStrList['Refailure']='##############kafka ERROR when read from kafka online task and resend back to kafka:'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
else:
taskInfos['inSource']= taskInfos['offering_id'];
taskInfos['outSource']= 'NO'
taskStatus['offLine'].put( taskInfos )
save_message(par['kafka'],taskInfos)
###发送状态信息waiting
msg = create_status_msg(msg_dict_off,taskInfos,sts='waiting')
outStrList['success']= '%s read from kafka offline task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kkafka ERROR when read from kafka offline task and back to kafka:,'
outStrList['Refailure']='##############kafka ERROR when read from kafka offline task and resend back to kafka:'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
#if get_whether_gpuProcess():
time0_taskQuery,printFlag = check_time_interval(time0_taskQuery,time_interval)
outstr_task= ' task queue onLine cnt:%d offLine:%d'%(taskStatus['onLine'].qsize(), taskStatus['offLine'].qsize())
##2-更新显卡信息
gpuStatus = getGPUInfos()
##3-优先考虑在线任务
if not taskStatus['onLine'].empty():
###3.1-先判断有没有空闲显卡:
cuda = get_available_gpu(gpuStatus)
###获取在线任务信息,并执行,lauch process
taskInfos = taskStatus['onLine'].get()
'''
#如果是在线任务则先检测流如若流每10秒更新1次两分钟内没流就断掉
msg_h= copy.deepcopy(msg_dict_on);
msg_h['status']='waiting';msg_h['msg_id']=taskInfos['msg_id'];msg_h = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s waiting stream, send heartbeat, msgId:%s, taskID:%s ,%s'%('-'*20, taskInfos['msg_id'],taskInfos['results_base_dir'],msg_h)
outStrList['failure']='#######kafka ERROR waiting stream, send heartbeat'
outStrList['Refailure']='##############kafka ERROR waiting stream, Re-send heartbeat'
print('################line389')
Stream_ok=check_stream(taskInfos['inSource'],producer,par,msg_h,outStrList,fp_log ,timeMs=par['StreamWaitingTime'])
if not Stream_ok:
outstr='##############live Stream ERROR #################'
outstr=wrtiteLog(fp_log,outstr);print( outstr);
continue
'''
print('################396',cuda)
if cuda: ###3.1.1 -有空余显卡
#lauch process
msg= copy.deepcopy(msg_dict_on);
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
'''
##返回kafka消息
msg=update_json(taskInfos,msg,offkeys=["msg_id","biz_id" ,"mod_id" ])
msg['results'][0]['original_url']=taskInfos['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos['outSource'])
msg['status']='running'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task from free gpu and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task from free gpu and back to kafka'
outStrList['Refailure']='##############kafka ERROR when start online task from free gpu and resend back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
else:###3.1.2-没有显卡
##判断有没有显卡上面都是离线进程的
cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos'])
if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程
cuda = cuda_pid['cuda']
pids = cuda_pid['pids']
##kill 离线进程,并更新离线任务表
cnt_off_0 = taskStatus['offLine'].qsize()
for pid in pids:
##kill 离线进程
taskStatus['pidInfos'][pid]['gpuProcess'].kill()
##更新离线任务表
taskStatus['offLine'].put( taskStatus['pidInfos'][pid]['taskInfos'] )
taskInfos_off=taskStatus['pidInfos'][pid]['taskInfos']
##发送离线数据说明状态变成waiting
msg= msg_dict_off;
msg=update_json(taskInfos_off,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
msg['results'][0]['original_url']=taskInfos_off['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos_off['outSource'])
msg['status']='waiting'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task after kill offline tasks and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos_off['results_base_dir'], taskInfos_off['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task after kill offline tasks and back to kafka'
outStrList['Refailure']='##############kkafka ERROR when start online task after kill offline tasks and back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
cnt_off_1 = taskStatus['offLine'].qsize()
outstr='%s before killing process, offtask cnt:%d ,after killing, offtask cnt:%d %s'%('-'*20 ,cnt_off_0,cnt_off_1,'*'*20)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
###更新pidinfosupdate pidInfos
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
'''
##返回kafka消息
msg= copy.deepcopy(msg_dict_on);
msg=update_json(taskInfos,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
msg['results'][0]['original_url']=taskInfos['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos['outSource'])
msg['status']='running'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task after kill offline tasks and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task after kill offline tasks and back to kafka'
outStrList['Refailure']='##############kkafka ERROR when start online task after kill offline tasks and back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
else:
outstr='######No available GPUs for onLine####'
outstr=wrtiteLog(fp_log,outstr);print( outstr);
##4-更新显卡信息
gpuStatus = getGPUInfos()
##5-考虑离线任务
if not taskStatus['offLine'].empty():
cudaArrange= arrange_offlineProcess(gpuStatus,taskStatus['pidInfos'],modelMemory=1500)
outstr='###line342 IN OFF LINE TASKS available cudas:%s'%(cudaArrange)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
for cuda in cudaArrange:
if not taskStatus['offLine'].empty():
taskInfos = taskStatus['offLine'].get()
'''
####检测离线视频是否有效,无效要报错
Stream_ok= get_fps_rtmp(taskInfos['inSource'],video=True)
if not Stream_ok:
outstr='############# offline vedio Error:%s #################'%(taskInfos['inSource'])
outstr=wrtiteLog(fp_log,outstr);print( outstr);
msg_h= copy.deepcopy(msg_dict_off);msg_h['error']=str(1001)###
msg_h['status']='failed';msg_h['msg_id']=taskInfos['msg_id'];msg_h = json.dumps(msg_h, ensure_ascii=False);
outStrList['success']= '%s video invalid msg sending success , msgId:%s, taskID:%s ,%s'%('-'*20, taskInfos['msg_id'],taskInfos['results_base_dir'],msg_h)
outStrList['failure']='#######kafka ERROR when sending invalid msg'
outStrList['Refailure']='##############kafka ERROR when Re-sending invalid msg'
send_kafka(producer,kafka_par,msg_h,outStrList,fp_log );
continue
'''
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'offLine','taskInfos':taskInfos}
'''
msg = create_status_msg(msg_dict_off,taskInfos,sts='running')
outStrList['success']= '---------start offline task and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%(gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start offline task and back to kafka'
outStrList['Refailure']='##############kafka ERROR when start offline task and resend back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
if get_whether_gpuProcess():
time0_sleep,printFlag = check_time_interval(time0_sleep,time_interval)
if printFlag:
timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
outstr= timestr2 + '*'*20 +'sleep '+'*'*20;
outstr=wrtiteLog(fp_log,outstr);print( outstr);
outstr_task=wrtiteLog(fp_log,outstr_task);print( outstr_task);
time.sleep(timeSleep)
print('########Program End#####')
if __name__ == '__main__':
par={};
###topic0--在线topic1--离线
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
#101.132.127.1:19092
'''
par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test';
par['kafka']='mintors/kafka'
par['modelJson']='conf/model.json'
'''
masterFile="conf/master.json"
assert os.path.exists(masterFile)
with open(masterFile,'r') as fp:
data=json.load(fp)
par=data['par']
print(par)
detector(par)