DSPRiverInspection/code_bak/master_0508.py

646 lines
31 KiB
Python
Raw Permalink Normal View History

2022-07-19 15:38:00 +08:00
import numpy as np
import time,ast,copy
from flask import request, Flask,jsonify
import base64,cv2,os,sys,json
sys.path.extend(['../yolov5'])
#from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp
from segutils.segmodel import SegModel,get_largest_contours
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.torch_utils import select_device, load_classifier, time_synchronized
from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
import subprocess as sp
import matplotlib.pyplot as plt
import torch,random,string
import multiprocessing
from multiprocessing import Process,Queue
import traceback
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
#torch.multiprocessing.set_start_method('spawn')
import utilsK
from utilsK.GPUtils import *
from utilsK.masterUtils import *
from utilsK.sendUtils import create_status_msg,update_json
#from utilsK.modelEval import onlineModelProcess
import random,string
from Send_tranfer_oss import msg_dict_on,msg_dict_off
import pykafka
from pykafka import KafkaClient
process_id=0
def onlineModelProcess(parIn ):
DEBUG=False
streamName = parIn['streamName']
childCallback=parIn['callback']
outStrList={}
#try:
for wan in ['test']:
jsonfile=parIn['modelJson']
with open(jsonfile,'r') as fp:
parAll = json.load(fp)
Detweights=parAll['gpu_process']['det_weights']
seg_nclass = parAll['gpu_process']['seg_nclass']
Segweights = parAll['gpu_process']['seg_weights']
videoSave = parAll['AI_video_save']
imageTxtFile = parAll['imageTxtFile']
taskId,msgId = streamName.split('-')[1:3]
inSource,outSource=parIn['inSource'],parIn['outSource']
##构建日志文件
if outSource != 'NO':
logdir = parAll['logChildProcessOnline']
waitingTime=parAll['StreamWaitingTime']
else:
logdir = parAll['logChildProcessOffline']
waitingTime=5
fp_log=create_logFile(logdir=logdir)
kafka_par=parIn['kafka_par']
producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000)
####要先检查视频的有效性
###开始的时候,如果在线任务没有流要发送的心跳消息msg_h,
msg_h= copy.deepcopy(msg_dict_off);
msg_h['status']='waiting';msg_h['msg_id']=msgId
if outSource == 'NO':
msg_h['type']=1
Stream_ok= get_fps_rtmp(inSource,video=True)
else:
msg_h['type']=2
msg_h_d = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s waiting stream or video, send heartbeat: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR waiting stream or video, send heartbeat'
outStrList['Refailure']='##############kafka ERROR waiting stream or video, Re-send heartbeat'
Stream_ok=check_stream(inSource,producer,kafka_par,msg_h_d,outStrList,fp_log ,timeMs=waitingTime)
if Stream_ok:###发送开始信号
msg_h['status']='running'
msg_h_d = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s informing stream/video is ok, taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR ,when informing stream/video is ok'
outStrList['Refailure']='##############kafka ERROR, when re-informing stream/video is ok'
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
else:
####检测离线视频是否有效,无效要报错
outstr='############# offline vedio or live stream Error:%s #################'%(inSource)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
msg_h['error']=str(1001);msg_h['status']='failed';
msg_h_d = json.dumps(msg_h, ensure_ascii=False);
outStrList['success']= '%s informing invaid video or stream success : taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg_h);
outStrList['failure']='#######kafka ERROR, when informing invaid video or stream'
outStrList['Refailure']='##############kafka ERROR,when re-informing invaid video or stream'
send_kafka(producer,kafka_par,msg_h_d,outStrList,fp_log );
childCallback.send(' offline vedio or live stream Error')
continue
if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')):
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[0:4]
else:
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[0:4]
fps = int(fps+0.5)
if outSource != 'NO':
command=['ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24',
'-s', "{}x{}".format(outW,outH),# 图片分辨率
'-r', str(fps),# 视频帧率
'-i', '-','-c:v', 'libx264','-pix_fmt', 'yuv420p',
'-f', 'flv',outSource
]
video_flag = videoSave['onLine']
logdir = parAll['logChildProcessOnline']
waitingTime=parAll['StreamWaitingTime']
else:
video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline']
waitingTime=5
fp_log=create_logFile(logdir=logdir)
device = select_device(parIn['device'])
half = device.type != 'cpu' # half precision only supported on CUDA
model = attempt_load(Detweights, map_location=device) # load FP32 model
if half: model.half()
segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device)
# 管道配置,其中用到管道
if outSource !='NO' :
ppipe = sp.Popen(command, stdin=sp.PIPE)
##后处理参数
par=parAll['post_process']
conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
outImaDir = par['outImaDir']
outVideoDir = par['outVideoDir']
labelnames=par['labelnames']
rainbows=par['rainbows']
fpsample = par['fpsample']
names=get_labelnames(labelnames)
label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
dataset = LoadStreams(inSource, img_size=640, stride=32)
childCallback.send('####model load success####')
if (outVideoDir!='NO') and video_flag:
msg_id = streamName.split('-')[2]
save_path = os.path.join(outVideoDir,msg_id+'.MP4')
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
iframe = 0;post_results=[];time_beg=time.time()
t00=time.time()
time_kafka0=time.time()
for path, img, im0s, vid_cap in dataset:
t0= time_synchronized()
if not path:
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_AI.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_OR.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
ret = cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
#print(EndUrl,ret)
childCallback.send('####strem ends####')
if (outVideoDir!='NO') and video_flag:
vid_writer.release()
break###断流或者到终点
if outSource == 'NO':###如果不推流,则显示进度条
view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] )
###直播和离线都是1分钟发一次消息。直播发
time_kafka1 = time.time()
if time_kafka1 - time_kafka0 >60:
time_kafka0 = time_kafka1
###发送状态信息waiting
msg = copy.deepcopy(msg_dict_off);
msg['msg_id']= msgId; msg
if outSource == 'NO':
msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt)
msg['type']=1
else:
msg['progressbarOn']= str(iframe)
msg['type']=2
msg = json.dumps(msg, ensure_ascii=False)
'''
try:
record_metadata = producer.send(kafka_par['topic'], msg).get()
outstr='%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
except Exception as e:
outstr='#######kafka ERROR when processing sending progressbar or heartBeat:, error: %s'%(str(e))
wrtiteLog(fp_log,outstr);print( outstr);
try:
producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
future = producer.send(par['topic'][2], msg).get()
except Exception as e:
outstr='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
'''
###发送状态信息waiting
outStrList['success']= '%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
outStrList['failure']='#######kafka ERROR when processing sending progressbar or heartBeat'
outStrList['Refailure']='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
time0=time.time()
iframe +=1
time1=time.time()
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
timeseg0 = time.time()
seg_pred,segstr = segmodel.eval(im0s[0] )
timeseg1 = time.time()
t1= time_synchronized()
pred = model(img,augment=False)[0]
time4 = time.time()
datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
t2= time_synchronized()
#print('###line138:',timeOut,outSource,outVideoDir)
##每隔 fpsample帧处理一次如果有问题就保存图片
if (iframe % fpsample == 0) and (len(post_results)>0) :
parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile)
post_results=[]
if len(p_result[2] )>0: ##
post_results.append(p_result)
t3= time_synchronized()
image_array = p_result[1]
if outSource!='NO':
ppipe.stdin.write(image_array.tobytes())
if (outVideoDir!='NO') and video_flag:
ret = vid_writer.write(image_array)
t4= time_synchronized()
timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
if iframe%100==0:
outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000)
wrtiteLog(fp_log,outstr);
#print(outstr)
t00 = t4;
##模型加载之类的错误
#except Exception as e:
# print(time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) ,'*'*20,'###line177 ERROR:',e)
# childCallback.send(e) #将异常通过管道送出
def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par):
if outSource=='NO':
streamName='off-%s-%s'%(taskId,msgId)
else:
streamName='live-%s-%s'%(taskId,msgId)
dataPar ={
'imgData':'',
'imgName':'testW',
'streamName':streamName,
'taskId':taskId,
'msgId':msgId,
'device':str(gpuid),
'modelJson':modelJson,
'kafka_par':kafka_par,
}
#dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None
dataPar['inSource'] = inSource;dataPar['outSource'] = outSource
process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid
parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn
gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,))
gpuProcess.start()
#print(dir(gpuProcess))
#child_return = parent_conn.recv()
#timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
#print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return)
return gpuProcess
msg_dict_offline = {
"biz_id":"hehuzhang",
"mod_id":"ai",
"msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
"offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
"offering_type":"mp4",
"results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
'outSource':'NO'
}
def detector(par):
####初始化信息列表
consumer = KafkaConsumer(bootstrap_servers=par['server'],client_id='AI_server',group_id=par['group_id'],auto_offset_reset='earliest')
consumer.subscribe( par['topic'][0:2])
'''
client = KafkaClient(hosts=par['server'])
consumer_pys=[]
for topic_name in par['topic'][0:2]:
consumer_pys.append(client.topics[ topic_name ].get_simple_consumer(consumer_group=par['group_id'],timeout=30))
'''
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
producer = KafkaProducer(
bootstrap_servers=par['server'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000)
taskStatus={}
taskStatus['onLine'] = Queue(100)
taskStatus['offLine']= Queue(100)
taskStatus['pidInfos']= {}
fp_log=create_logFile(logdir=par['logDir'])
wrtiteLog(fp_log,'###########masster starts in line222######\n')
timeSleep=1
#taskStatus['pidInfos'][31897]={'gpuProcess':'onlineProcess','type':'onLine'}
time0=time.time()
time0_kafQuery=time.time()
time0_taskQuery=time.time()
time0_sleep=time.time()
time_interval=10; outStrList={}
while True:###每隔timeSleep秒轮询一次
#for isleep in range(1):
##1-读取kafka更新任务类别
try:
'''
msgs=[]
for consumer in consumer_pys:
for msg in consumer:
if msg is None:break
else:msgs.append(msg)
'''
msgs = getAllRecords(consumer,par['topic'])
except Exception as e:
outstr='%s kafka connecting error:%s '%('#'*20,e)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
time.sleep(timeSleep)
continue
#if get_whether_gpuProcess():
time0_kafQuery,printFlag = check_time_interval(time0_kafQuery,time_interval)
if printFlag:
outstr_kafka=' ##### kafka Left %d records####'%(len(msgs));
outstr_kafka=wrtiteLog(fp_log,outstr_kafka)
for ii,msg in enumerate(msgs):
try:
#taskInfos = eval(json.loads(msg.value ))
taskInfos = eval(msg.value.decode('utf-8') )
except:
outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic)
continue
if msg.topic == par['topic'][0]: ##
taskInfos['inSource']= taskInfos['pull_channel'];
taskInfos['outSource']= get_push_address(taskInfos['push_channel']) ;
taskStatus['onLine'].put( taskInfos )
save_message(par['kafka'],taskInfos)
###发送状态信息waiting
msg = create_status_msg(msg_dict_on,taskInfos,sts='waiting')
outStrList['success']= '%s read from kafka online task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when read from kafka online task and back to kafka'
outStrList['Refailure']='##############kafka ERROR when read from kafka online task and resend back to kafka:'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
else:
taskInfos['inSource']= taskInfos['offering_id'];
taskInfos['outSource']= 'NO'
taskStatus['offLine'].put( taskInfos )
save_message(par['kafka'],taskInfos)
###发送状态信息waiting
msg = create_status_msg(msg_dict_off,taskInfos,sts='waiting')
outStrList['success']= '%s read from kafka offline task and back to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kkafka ERROR when read from kafka offline task and back to kafka:,'
outStrList['Refailure']='##############kafka ERROR when read from kafka offline task and resend back to kafka:'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
#if get_whether_gpuProcess():
time0_taskQuery,printFlag = check_time_interval(time0_taskQuery,time_interval)
outstr_task= ' task queue onLine cnt:%d offLine:%d'%(taskStatus['onLine'].qsize(), taskStatus['offLine'].qsize())
##2-更新显卡信息
gpuStatus = getGPUInfos()
##3-优先考虑在线任务
if not taskStatus['onLine'].empty():
###3.1-先判断有没有空闲显卡:
cuda = get_available_gpu(gpuStatus)
###获取在线任务信息,并执行,lauch process
taskInfos = taskStatus['onLine'].get()
'''
#如果是在线任务则先检测流如若流每10秒更新1次两分钟内没流就断掉
msg_h= copy.deepcopy(msg_dict_on);
msg_h['status']='waiting';msg_h['msg_id']=taskInfos['msg_id'];msg_h = json.dumps(msg_h, ensure_ascii=False)
outStrList['success']= '%s waiting stream, send heartbeat, msgId:%s, taskID:%s ,%s'%('-'*20, taskInfos['msg_id'],taskInfos['results_base_dir'],msg_h)
outStrList['failure']='#######kafka ERROR waiting stream, send heartbeat'
outStrList['Refailure']='##############kafka ERROR waiting stream, Re-send heartbeat'
print('################line389')
Stream_ok=check_stream(taskInfos['inSource'],producer,par,msg_h,outStrList,fp_log ,timeMs=par['StreamWaitingTime'])
if not Stream_ok:
outstr='##############live Stream ERROR #################'
outstr=wrtiteLog(fp_log,outstr);print( outstr);
continue
'''
print('################396',cuda)
if cuda: ###3.1.1 -有空余显卡
#lauch process
msg= copy.deepcopy(msg_dict_on);
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
'''
##返回kafka消息
msg=update_json(taskInfos,msg,offkeys=["msg_id","biz_id" ,"mod_id" ])
msg['results'][0]['original_url']=taskInfos['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos['outSource'])
msg['status']='running'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task from free gpu and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task from free gpu and back to kafka'
outStrList['Refailure']='##############kafka ERROR when start online task from free gpu and resend back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
else:###3.1.2-没有显卡
##判断有没有显卡上面都是离线进程的
cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos'])
if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程
cuda = cuda_pid['cuda']
pids = cuda_pid['pids']
##kill 离线进程,并更新离线任务表
cnt_off_0 = taskStatus['offLine'].qsize()
for pid in pids:
##kill 离线进程
taskStatus['pidInfos'][pid]['gpuProcess'].kill()
##更新离线任务表
taskStatus['offLine'].put( taskStatus['pidInfos'][pid]['taskInfos'] )
taskInfos_off=taskStatus['pidInfos'][pid]['taskInfos']
##发送离线数据说明状态变成waiting
msg= msg_dict_off;
msg=update_json(taskInfos_off,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
msg['results'][0]['original_url']=taskInfos_off['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos_off['outSource'])
msg['status']='waiting'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task after kill offline tasks and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos_off['results_base_dir'], taskInfos_off['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task after kill offline tasks and back to kafka'
outStrList['Refailure']='##############kkafka ERROR when start online task after kill offline tasks and back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
cnt_off_1 = taskStatus['offLine'].qsize()
outstr='%s before killing process, offtask cnt:%d ,after killing, offtask cnt:%d %s'%('-'*20 ,cnt_off_0,cnt_off_1,'*'*20)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
###更新pidinfosupdate pidInfos
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'onLine','taskInfos':taskInfos}
'''
##返回kafka消息
msg= copy.deepcopy(msg_dict_on);
msg=update_json(taskInfos,msg,offkeys=["msg_id","biz_id" ,"mod_id"] )
msg['results'][0]['original_url']=taskInfos['inSource']
msg['results'][0]['sign_url']=get_boradcast_address(taskInfos['outSource'])
msg['status']='running'
msg = json.dumps(msg, ensure_ascii=False)
outStrList['success']= '%s start online task after kill offline tasks and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%('-'*20,gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start online task after kill offline tasks and back to kafka'
outStrList['Refailure']='##############kkafka ERROR when start online task after kill offline tasks and back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
else:
outstr='######No available GPUs for onLine####'
outstr=wrtiteLog(fp_log,outstr);print( outstr);
##4-更新显卡信息
gpuStatus = getGPUInfos()
##5-考虑离线任务
if not taskStatus['offLine'].empty():
cudaArrange= arrange_offlineProcess(gpuStatus,taskStatus['pidInfos'],modelMemory=1500)
outstr='###line342 IN OFF LINE TASKS available cudas:%s'%(cudaArrange)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
for cuda in cudaArrange:
if not taskStatus['offLine'].empty():
taskInfos = taskStatus['offLine'].get()
'''
####检测离线视频是否有效,无效要报错
Stream_ok= get_fps_rtmp(taskInfos['inSource'],video=True)
if not Stream_ok:
outstr='############# offline vedio Error:%s #################'%(taskInfos['inSource'])
outstr=wrtiteLog(fp_log,outstr);print( outstr);
msg_h= copy.deepcopy(msg_dict_off);msg_h['error']=str(1001)###
msg_h['status']='failed';msg_h['msg_id']=taskInfos['msg_id'];msg_h = json.dumps(msg_h, ensure_ascii=False);
outStrList['success']= '%s video invalid msg sending success , msgId:%s, taskID:%s ,%s'%('-'*20, taskInfos['msg_id'],taskInfos['results_base_dir'],msg_h)
outStrList['failure']='#######kafka ERROR when sending invalid msg'
outStrList['Refailure']='##############kafka ERROR when Re-sending invalid msg'
send_kafka(producer,kafka_par,msg_h,outStrList,fp_log );
continue
'''
gpuProcess=lauch_process(cuda,taskInfos['inSource'],taskInfos['outSource'],taskInfos['results_base_dir'],taskInfos['msg_id'],par['modelJson'],kafka_par)
taskStatus['pidInfos'][gpuProcess.pid] = {'gpuProcess':gpuProcess,'type':'offLine','taskInfos':taskInfos}
'''
msg = create_status_msg(msg_dict_off,taskInfos,sts='running')
outStrList['success']= '---------start offline task and back to kafka: pid:%d taskId:%s msgId:%s send:%s'%(gpuProcess.pid,taskInfos['results_base_dir'], taskInfos['msg_id'],msg)
outStrList['failure']='#######kafka ERROR when start offline task and back to kafka'
outStrList['Refailure']='##############kafka ERROR when start offline task and resend back to kafka'
send_kafka(producer,kafka_par,msg,outStrList,fp_log );
'''
if get_whether_gpuProcess():
time0_sleep,printFlag = check_time_interval(time0_sleep,time_interval)
if printFlag:
timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
outstr= timestr2 + '*'*20 +'sleep '+'*'*20;
outstr=wrtiteLog(fp_log,outstr);print( outstr);
outstr_task=wrtiteLog(fp_log,outstr_task);print( outstr_task);
time.sleep(timeSleep)
print('########Program End#####')
if __name__ == '__main__':
par={};
###topic0--在线topic1--离线
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
#101.132.127.1:19092
'''
par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test';
par['kafka']='mintors/kafka'
par['modelJson']='conf/model.json'
'''
masterFile="conf/master.json"
assert os.path.exists(masterFile)
with open(masterFile,'r') as fp:
data=json.load(fp)
par=data['par']
print(par)
detector(par)