DSPRiverInspection/code_bak/consumer_sleep.py

421 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import time,ast,copy
from flask import request, Flask,jsonify
import base64,cv2,os,sys,json
sys.path.extend(['../yolov5'])
#from Send_tranfer import b64encode_function,JsonSend,name_dic,nameID_dic,getLogFileFp
from segutils.segmodel import SegModel,get_largest_contours
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.torch_utils import select_device, load_classifier, time_synchronized
from queRiver import get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
import subprocess as sp
import matplotlib.pyplot as plt
import torch,random,string
import multiprocessing
from multiprocessing import Process,Queue
import traceback
from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
#torch.multiprocessing.set_start_method('spawn')
import utilsK
from utilsK.GPUtils import *
from utilsK.masterUtils import *
from utilsK.sendUtils import create_status_msg,update_json
#from utilsK.modelEval import onlineModelProcess
import random,string
from Send_tranfer_oss import msg_dict_on,msg_dict_off
process_id=0
def onlineModelProcess(parIn ):
DEBUG=False
streamName = parIn['streamName']
childCallback=parIn['callback']
#try:
for wan in ['test']:
jsonfile=parIn['modelJson']
with open(jsonfile,'r') as fp:
parAll = json.load(fp)
Detweights=parAll['gpu_process']['det_weights']
seg_nclass = parAll['gpu_process']['seg_nclass']
Segweights = parAll['gpu_process']['seg_weights']
videoSave = parAll['AI_video_save']
imageTxtFile = parAll['imageTxtFile']
inSource,outSource=parIn['inSource'],parIn['outSource']
kafka_par=parIn['kafka_par']
producer = KafkaProducer(bootstrap_servers=kafka_par['server'],value_serializer=lambda v: v.encode('utf-8'),metadata_max_age_ms=120000)
device = select_device(parIn['device'])
half = device.type != 'cpu' # half precision only supported on CUDA
model = attempt_load(Detweights, map_location=device) # load FP32 model
if half: model.half()
#print('###line116:,',len(dataset),dataset)
if (inSource.endswith('.MP4')) or (inSource.endswith('.mp4')):
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=True)[0:4]
else:
fps,outW,outH,totalcnt=get_fps_rtmp(inSource,video=False)[0:4]
fps = int(fps+0.5)
segmodel = SegModel(nclass=seg_nclass,weights=Segweights,device=device)
if outSource != 'NO':
command=['ffmpeg','-y','-f', 'rawvideo','-vcodec','rawvideo','-pix_fmt', 'bgr24',
'-s', "{}x{}".format(outW,outH),# 图片分辨率
'-r', str(fps),# 视频帧率
'-i', '-','-c:v', 'libx264','-pix_fmt', 'yuv420p',
'-f', 'flv',outSource
]
video_flag = videoSave['onLine']
logdir = parAll['logChildProcessOnline']
#print('*'*20,'###line82',command)
else:
video_flag = videoSave['offLine'] ;logdir = parAll['logChildProcessOffline']
fp_log=create_logFile(logdir=logdir)
# 管道配置,其中用到管道
if outSource !='NO' :
ppipe = sp.Popen(command, stdin=sp.PIPE)
##后处理参数
par=parAll['post_process']
conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
outImaDir = par['outImaDir']
outVideoDir = par['outVideoDir']
labelnames=par['labelnames']
rainbows=par['rainbows']
fpsample = par['fpsample']
names=get_labelnames(labelnames)
label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
dataset = LoadStreams(inSource, img_size=640, stride=32)
childCallback.send('####model load success####')
if (outVideoDir!='NO') and video_flag:
msg_id = streamName.split('-')[2]
save_path = os.path.join(outVideoDir,msg_id+'.MP4')
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH))
iframe = 0;post_results=[];time_beg=time.time()
t00=time.time()
time_kafka0=time.time()
for path, img, im0s, vid_cap in dataset:
t0= time_synchronized()
if not path:
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_AI.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
EndUrl='%s/%s_frame-9999-9999_type-结束_9999999999999999_s-%s_OR.jpg'%(outImaDir,time_str(),streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
ret = cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
#print(EndUrl,ret)
childCallback.send('####strem ends####')
if (outVideoDir!='NO') and video_flag:
vid_writer.release()
break###断流或者到终点
if outSource == 'NO':###如果不推流,则显示进度条
view_bar(iframe,totalcnt,time_beg ,parIn['process_uid'] )
###直播和离线都是1分钟发一次消息。直播发
time_kafka1 = time.time()
if time_kafka1 - time_kafka0 >60:
time_kafka0 = time_kafka1
###发送状态信息waiting
msg = copy.deepcopy(msg_dict_off);taskId,msgId = streamName.split('-')[1:3]
msg['msg_id']= msgId; msg
if outSource == 'NO':
msg['progressbar']= '%.4f'%(iframe*1.0/totalcnt)
msg['type']=1
else:
msg['progressbarOn']= str(iframe)
msg['type']=2
msg = json.dumps(msg, ensure_ascii=False)
try:
record_metadata = producer.send(kafka_par['topic'], msg).get()
outstr='%s processing send progressbar or heartBeat to kafka: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
except Exception as e:
outstr='#######kafka ERROR when processing sending progressbar or heartBeat:, error: %s'%(str(e))
wrtiteLog(fp_log,outstr);print( outstr);
try:
producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
future = producer.send(par['topic'][2], msg).get()
except Exception as e:
outstr='%s re-send progressbar or heartBeat kafka,processing video or stream: taskId:%s msgId:%s send:%s'%('-'*20,taskId, msgId,msg);
wrtiteLog(fp_log,outstr);print( outstr);
time0=time.time()
iframe +=1
time1=time.time()
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
timeseg0 = time.time()
seg_pred,segstr = segmodel.eval(im0s[0] )
timeseg1 = time.time()
t1= time_synchronized()
pred = model(img,augment=False)[0]
time4 = time.time()
datas = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
t2= time_synchronized()
#print('###line138:',timeOut,outSource,outVideoDir)
##每隔 fpsample帧处理一次如果有问题就保存图片
if (iframe % fpsample == 0) and (len(post_results)>0) :
parImage=save_problem_images(post_results,iframe,names,streamName=streamName,outImaDir='problems/images_tmp',imageTxtFile=imageTxtFile)
post_results=[]
if len(p_result[2] )>0: ##
post_results.append(p_result)
t3= time_synchronized()
image_array = p_result[1]
if outSource!='NO':
ppipe.stdin.write(image_array.tobytes())
if (outVideoDir!='NO') and video_flag:
ret = vid_writer.write(image_array)
t4= time_synchronized()
timestr2 = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
if iframe%100==0:
outstr='%s,,read:%.1f ms,copy:%.1f, infer:%.1f ms, detinfer:%.1f ms,draw:%.1f ms, save:%.1f ms total:%.1f ms \n'%(timestr2,(t0 - t00)*1000,(timeseg0-t0)*1000, (t1 - timeseg0)*1000,(t2-t1)*1000, (t3 - t2)*1000,(t4-t3)*1000, (t4-t00)*1000)
wrtiteLog(fp_log,outstr);
#print(outstr)
t00 = t4;
##模型加载之类的错误
#except Exception as e:
# print(time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) ,'*'*20,'###line177 ERROR:',e)
# childCallback.send(e) #将异常通过管道送出
def lauch_process(gpuid,inSource,outSource,taskId,msgId,modelJson,kafka_par):
if outSource=='NO':
streamName='off-%s-%s'%(taskId,msgId)
else:
streamName='live-%s-%s'%(taskId,msgId)
dataPar ={
'imgData':'',
'imgName':'testW',
'streamName':streamName,
'taskId':taskId,
'msgId':msgId,
'device':str(gpuid),
'modelJson':modelJson,
'kafka_par':kafka_par,
}
#dataPar['inSource'] = 'http://images.5gai.taauav.com/video/8bc32984dd893930dabb2856eb92b4d1.mp4';dataPar['outSource'] = None
dataPar['inSource'] = inSource;dataPar['outSource'] = outSource
process_uid=''.join(random.sample(string.ascii_letters + string.digits, 16));dataPar['process_uid']=process_uid
parent_conn, child_conn = multiprocessing.Pipe();dataPar['callback']=child_conn
gpuProcess=Process(target=onlineModelProcess,name='process:%s'%( process_uid ),args=(dataPar,))
gpuProcess.start()
#print(dir(gpuProcess))
child_return = parent_conn.recv()
timestr2=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
print(timestr2,'-'*20,'progress:%s ,msgId:%s , taskId:%s return:'%(process_uid,msgId,taskId),child_return)
return gpuProcess
msg_dict_offline = {
"biz_id":"hehuzhang",
"mod_id":"ai",
"msg_id":'bb'+''.join(random.sample(string.ascii_letters ,30) ) ,
"offering_id":"http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4",
"offering_type":"mp4",
"results_base_dir": "XJRW202203171535"+str(random.randint(10,99)),
'outSource':'NO'
}
def detector_0(par):
####初始化信息列表
consumer = KafkaConsumer(
bootstrap_servers=par['server'],
group_id=par['group_id'],
auto_offset_reset='earliest',
#max_poll_interval_ms = 1000*60*6,
#session_timeout_ms=1000*60*5,
request_timeout_ms=15000,
#enable_auto_commit=True
)
consumer.subscribe( par['topic'][0:2])
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
producer = KafkaProducer(
bootstrap_servers=par['server'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000)
taskStatus={}
taskStatus['onLine'] = Queue(100)
taskStatus['offLine']= Queue(100)
taskStatus['pidInfos']= {}
fp_log=create_logFile(logdir=par['logDir'])
wrtiteLog(fp_log,'###########masster starts in line222######\n')
timeSleep=1
#taskStatus['pidInfos'][31897]={'gpuProcess':'onlineProcess','type':'onLine'}
time0=time.time()
time0_kafQuery=time.time()
time0_taskQuery=time.time()
time0_sleep=time.time()
time_interval=10; outStrList={}
isleep=0
while True:###每隔timeSleep秒轮询一次
#for isleep in range(1):
##1-读取kafka更新任务类别
try:
#msgs = getAllRecords(consumer,par['topic'])
msgs=[]
for ii,msg in enumerate(consumer):
consumer.commit()
msgs.append(msg)
except Exception as e:
outstr='%s kafka connecting error:%s '%('#'*20,e)
outstr=wrtiteLog(fp_log,outstr);print( outstr);
time.sleep(timeSleep)
continue
#if get_whether_gpuProcess():
for it in range(30):
timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
print('%s i=%d sleep:%s '%(timestr,isleep,it*10))
time.sleep(10)
isleep+=1
print('########Program End#####')
def detector(par):
####初始化信息列表
consumer = KafkaConsumer(
bootstrap_servers=par['server'],
group_id=par['group_id'],
auto_offset_reset='earliest',
#max_poll_interval_ms = 1000*60*6,
#session_timeout_ms=1000*60*5,
#request_timeout_ms=11000,
#enable_auto_commit=True
)
consumer.subscribe( par['topic'][0:2])
kafka_par ={ 'server':par['server'],'topic':par['topic'][2] }
producer = KafkaProducer(
bootstrap_servers=par['server'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000)
taskStatus={}
taskStatus['onLine'] = Queue(100)
taskStatus['offLine']= Queue(100)
taskStatus['pidInfos']= {}
timeSleep=1
#taskStatus['pidInfos'][31897]={'gpuProcess':'onlineProcess','type':'onLine'}
time0=time.time()
time0_kafQuery=time.time()
time0_taskQuery=time.time()
time0_sleep=time.time()
time_interval=10; outStrList={}
isleep=0
for ii,msg in enumerate(consumer):
try:
taskInfos = eval(msg.value.decode('utf-8') )
except:
outstr='%s msg format error,value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.topic,msg.topic)
continue
outstr='%s value:%s,offset:%d partition:%s topic:%s'%('#'*20,msg.value,msg.offset,msg.partition,msg.topic)
print(outstr)
def get_file():
print("文件名 :",__file__,sys._getframe().f_lineno)
print("函数名: ", sys._getframe().f_code.co_name)
print("模块名: ", sys._getframe().f_back.f_code.co_name)
if __name__ == '__main__':
par={};
###topic0--在线topic1--离线
#par['server']='212.129.223.66:9092';par['topic']=('thsw','thsw2','testReturn');par['group_id']='test';
#101.132.127.1:19092
'''
par['server']='101.132.127.1:19092 ';par['topic']=('alg-online-tasks','alg-offline-tasks','alg-task-results');par['group_id']='test';
par['kafka']='mintors/kafka'
par['modelJson']='conf/model.json'
'''
masterFile="conf/master_ten.json"
assert os.path.exists(masterFile)
with open(masterFile,'r') as fp:
data=json.load(fp)
get_file()
par=data['par']
print(par)
detector(par)