from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errors import traceback import json, base64,os import numpy as np from multiprocessing import Process,Queue import time,cv2,string,random import subprocess as sp import matplotlib.pyplot as plt from utils.datasets import LoadStreams, LoadImages from models.experimental import attempt_load from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression,overlap_box_suppression, apply_classifier, scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path import torch,sys #from segutils.segmodel import SegModel,get_largest_contours #sys.path.extend(['../yolov5/segutils']) from segutils.segWaterBuilding import SegModel,get_largest_contours,illBuildings #from segutils.core.models.bisenet import BiSeNet from segutils.core.models.bisenet import BiSeNet_MultiOutput from utils.plots import plot_one_box,plot_one_box_PIL,draw_painting_joint,get_label_arrays,get_websource from collections import Counter #import matplotlib import matplotlib.pyplot as plt # get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str #FP_DEBUG=open('debut.txt','w') def bsJpgCode(image_ori): jpgCode = cv2.imencode('.jpg',image_ori)[-1]###np.array,(4502009,1) bsCode = str(base64.b64encode(jpgCode))[2:-1] ###str,长6002680 return bsCode def bsJpgDecode(bsCode): bsDecode = base64.b64decode(bsCode)###types,长4502009 npString = np.frombuffer(bsDecode,np.uint8)###np.array,(长4502009,) jpgDecode = cv2.imdecode(npString,cv2.IMREAD_COLOR)###np.array,(3000,4000,3) return jpgDecode def get_ms(time0,time1): str_time ='%.2f ms'%((time1-time0)*1000) return str_time rainbows=[ (0,0,255),(0,255,0),(255,0,0),(255,0,255),(255,255,0),(255,127,0),(255,0,127), (127,255,0),(0,255,127),(0,127,255),(127,0,255),(255,127,255),(255,255,127), (127,255,255),(0,255,255),(255,127,255),(127,255,255), (0,127,0),(0,0,127),(0,255,255) ] def get_labelnames(labelnames): with open(labelnames,'r') as fp: namesjson=json.load(fp) names_fromfile=namesjson['labelnames'] names = names_fromfile return names def check_stream(stream): cap = cv2.VideoCapture(stream) if cap.isOpened(): return True else: return False ##### def drawWater(pred,image_array0,river={'color':(0,255,255),'line_width':3,'segRegionCnt':2,'segLineShow':True}):####pred是模型的输出,只有水分割的任务 ##画出水体区域 contours, hierarchy = cv2.findContours(pred,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE) water = pred.copy(); water[:,:] = 0 if len(contours)==0: return image_array0,water max_ids = get_largest_contours(contours,river['segRegionCnt']); for max_id in max_ids: cv2.fillPoly(water, [contours[max_id][:,0,:]], 1) if river['segLineShow']: cv2.drawContours(image_array0,contours,max_id,river['color'],river['line_width'] ) return image_array0,water def scale_back(boxes,padInfos): top, left,r = padInfos[0:3] boxes[:,0] = (boxes[:,0] - left) * r boxes[:,2] = (boxes[:,2] - left) * r boxes[:,1] = (boxes[:,1] - top) * r boxes[:,3] = (boxes[:,3] - top) * r return boxes def img_pad(img, size, pad_value=[114,114,114]): ###填充成固定尺寸 H,W,_ = img.shape r = max(H/size[0], W/size[1]) img_r = cv2.resize(img, (int(W/r), int(H/r))) tb = size[0] - img_r.shape[0] lr = size[1] - img_r.shape[1] top = int(tb/2) bottom = tb - top left = int(lr/2) right = lr - left pad_image = cv2.copyMakeBorder(img_r, top, bottom, left, right, cv2.BORDER_CONSTANT,value=pad_value) return pad_image,(top, left,r) def post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe,ObjectPar={ 'object_config':[0,1,2,3,4], 'slopeIndex':[5,6,7] ,'segmodel':True,'segRegionCnt':1 },font={ 'line_thickness':None, 'fontSize':None,'boxLine_thickness':None,'waterLineColor':(0,255,255),'waterLineWidth':3},padInfos=None ,ovlap_thres=None): object_config,slopeIndex,segmodel,segRegionCnt=ObjectPar['object_config'],ObjectPar['slopeIndex'],ObjectPar['segmodel'],ObjectPar['segRegionCnt'] ##输入dataset genereate 生成的数据,model预测的结果pred,nms参数 ##主要操作NMS ---> 坐标转换 ---> 画图 ##输出原图、AI处理后的图、检测结果 time0=time.time() path, img, im0s, vid_cap ,pred,seg_pred= datas[0:6]; #segmodel=True pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False) if ovlap_thres: pred = overlap_box_suppression(pred, ovlap_thres) time1=time.time() i=0;det=pred[0]###一次检测一张图片 time1_1 = time.time() #p, s, im0 = path[i], '%g: ' % i, im0s[i].copy() p, s, im0 = path[i], '%g: ' % i, im0s[i] time1_2 = time.time() gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh time1_3 = time.time() det_xywh=[]; #im0_brg=cv2.cvtColor(im0,cv2.COLOR_RGB2BGR); if segmodel: if len(seg_pred)==2: im0,water = illBuildings(seg_pred,im0) else: river={ 'color':font['waterLineColor'],'line_width':font['waterLineWidth'],'segRegionCnt':segRegionCnt,'segLineShow':font['segLineShow'] } im0,water = drawWater(seg_pred,im0,river) time2=time.time() #plt.imshow(im0);plt.show() if len(det)>0: # Rescale boxes from img_size to im0 size if not padInfos: det[:, :4] = scale_coords(img.shape[2:], det[:, :4],im0.shape).round() else: #print('####line131:',det[:, :]) det[:, :4] = scale_back( det[:, :4],padInfos).round() #print('####line133:',det[:, :]) #用seg模型,确定有效检测匡及河道轮廓线 if segmodel: cls_indexs = det[:, 5].clone().cpu().numpy().astype(np.int32) ##判断哪些目标属于岸坡的 slope_flag = np.array([x in slopeIndex for x in cls_indexs ] ) det_c = det.clone(); det_c=det_c.cpu().numpy() try: area_factors = np.array([np.sum(water[int(x[1]):int(x[3]), int(x[0]):int(x[2])] )*1.0/(1.0*(x[2]-x[0])*(x[3]-x[1])+0.00001) for x in det_c] ) except: print('*****************************line143: error:',det_c) water_flag = np.array(area_factors>0.1) det = det[water_flag|slope_flag]##如果是水上目标,则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。 #对检测匡绘图 for *xyxy, conf, cls in reversed(det): xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh cls_c = cls.cpu().numpy() conf_c = conf.cpu().numpy() tt=[ int(x.cpu()) for x in xyxy] #line = [float(cls_c), *tt, float(conf_c)] # label format line = [*tt, float(conf_c), float(cls_c)] # label format det_xywh.append(line) label = f'{names[int(cls)]} {conf:.2f}' #print('- '*20, ' line165:',xyxy,cls,conf ) if int(cls_c) not in object_config: ###如果不是所需要的目标,则不显示 continue #print('- '*20, ' line168:',xyxy,cls,conf ) im0 = draw_painting_joint(xyxy,im0,label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],font=font) time3=time.time() strout='nms:%s drawWater:%s,copy:%s,toTensor:%s,detDraw:%s '%(get_ms(time0,time1),get_ms(time1,time2),get_ms(time1_1,time1_2),get_ms(time1_2,time1_3), get_ms(time2,time3) ) return [im0s[0],im0,det_xywh,iframe],strout def getDetections(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe,ObjectPar={ 'object_config':[0,1,2,3,4], 'slopeIndex':[5,6,7] ,'segmodel':True,'segRegionCnt':1 },font={ 'line_thickness':None, 'fontSize':None,'boxLine_thickness':None,'waterLineColor':(0,255,255),'waterLineWidth':3},padInfos=None ,ovlap_thres=None): object_config,slopeIndex,segmodel,segRegionCnt=ObjectPar['object_config'],ObjectPar['slopeIndex'],ObjectPar['segmodel'],ObjectPar['segRegionCnt'] ##输入dataset genereate 生成的数据,model预测的结果pred,nms参数 ##主要操作NMS ---> 坐标转换 ---> 画图 ##输出原图、AI处理后的图、检测结果 time0=time.time() path, img, im0s, vid_cap ,pred,seg_pred= datas[0:6]; #segmodel=True pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False) if ovlap_thres: pred = overlap_box_suppression(pred, ovlap_thres) time1=time.time() i=0;det=pred[0]###一次检测一张图片 time1_1 = time.time() #p, s, im0 = path[i], '%g: ' % i, im0s[i].copy() p, s, im0 = path[i], '%g: ' % i, im0s[i] time1_2 = time.time() gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh time1_3 = time.time() det_xywh=[]; #im0_brg=cv2.cvtColor(im0,cv2.COLOR_RGB2BGR); if segmodel: if len(seg_pred)==2: im0,water = illBuildings(seg_pred,im0) else: river={ 'color':font['waterLineColor'],'line_width':font['waterLineWidth'],'segRegionCnt':segRegionCnt,'segLineShow':font['segLineShow'] } im0,water = drawWater(seg_pred,im0,river) time2=time.time() #plt.imshow(im0);plt.show() if len(det)>0: # Rescale boxes from img_size to im0 size if not padInfos: det[:, :4] = scale_coords(img.shape[2:], det[:, :4],im0.shape).round() else: #print('####line131:',det[:, :]) det[:, :4] = scale_back( det[:, :4],padInfos).round() #print('####line133:',det[:, :]) #用seg模型,确定有效检测匡及河道轮廓线 if segmodel: cls_indexs = det[:, 5].clone().cpu().numpy().astype(np.int32) ##判断哪些目标属于岸坡的 slope_flag = np.array([x in slopeIndex for x in cls_indexs ] ) det_c = det.clone(); det_c=det_c.cpu().numpy() try: area_factors = np.array([np.sum(water[int(x[1]):int(x[3]), int(x[0]):int(x[2])] )*1.0/(1.0*(x[2]-x[0])*(x[3]-x[1])+0.00001) for x in det_c] ) except: print('*****************************line143: error:',det_c) water_flag = np.array(area_factors>0.1) det = det[water_flag|slope_flag]##如果是水上目标,则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。 #对检测匡绘图 for *xyxy, conf, cls in reversed(det): xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh cls_c = cls.cpu().numpy() conf_c = conf.cpu().numpy() tt=[ int(x.cpu()) for x in xyxy] line = [float(cls_c), *tt, float(conf_c)] # label format det_xywh.append(line) label = f'{names[int(cls)]} {conf:.2f}' if int(cls_c) not in object_config: ###如果不是所需要的目标,则不显示 continue time3=time.time() strout='nms:%s drawWater:%s,copy:%s,toTensor:%s,detDraw:%s '%(get_ms(time0,time1),get_ms(time1,time2),get_ms(time1_1,time1_2),get_ms(time1_2,time1_3), get_ms(time2,time3) ) return [im0s[0],im0,det_xywh,iframe],strout def riverDetSegMixProcess(preds,water,pars={'slopeIndex':list(range(20)),'riverIou':0.1}): ''' 输入参数: preds:二维的list,之前的检测结果,格式,[cls,x0,y0,x1,y1,score] water:二维数据,值是0,1。1--表示水域,0--表示背景。 im0: 原始没有 pars:出去preds,water之外的参数,dict形式 slopeIndex:岸坡上目标类别索引 threshold:水里的目标,与水域重合的比例阈值 输出参数: det:检测结果 ''' assert 'slopeIndex' in pars.keys(), 'input para keys error,No: slopeIndex' assert 'riverIou' in pars.keys(), 'input para keys error, No: riverIou' time0 = time.time() slopeIndex,riverIou = pars['slopeIndex'],pars['riverIou'] if len(preds)>0: preds = np.array(preds) cls_indexs = [int(x[5]) for x in preds] #area_factors= np.array([np.sum(water[int(x[2]):int(x[4]), int(x[1]):int(x[3])] )*1.0/(1.0*(x[3]-x[1])*(x[4]-x[2])+0.00001) for x in preds] ) area_factors= np.array([np.sum(water[int(x[1]):int(x[3]), int(x[0]):int(x[2])] )*1.0/(1.0*(x[2]-x[0])*(x[3]-x[1])+0.00001) for x in preds] ) slope_flag = np.array([x in slopeIndex for x in cls_indexs ] ) water_flag = np.array(area_factors>riverIou) det = preds[water_flag|slope_flag]##如果是水上目标,则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。 else: det=[] #print('##'*20,det) time1=time.time() timeInfos = 'all: %.1f '%( (time1-time0) ) return det ,timeInfos def getDetectionsFromPreds(pred,img,im0,conf_thres=0.2,iou_thres=0.45,ovlap_thres=0.6,padInfos=None): ''' 输入参数: preds--检测模型输出的结果 img--输入检测模型是的图像 im0--原始图像 conf_thres-- 一次NMS置信度的阈值 iou_thres-- 一次NMS Iou 的阈值 ovlap_thres-- 二次NMS Iou 的阈值 padInfos--resize时候的填充信息. 输出: img,im0--同输入 det_xywh--二维list,存放检测结果,格式为[cls, x0,y0,x1,y1, score] strout--时间信息 ''' time0=time.time() pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False) if ovlap_thres: pred = overlap_box_suppression(pred, ovlap_thres) time1=time.time() i=0;det=pred[0]###一次检测一张图片 det_xywh=[] if len(det)>0: #将坐标恢复成原始尺寸的大小 H,W = im0.shape[0:2] det[:, :4] = scale_back( det[:, :4],padInfos).round() if padInfos else scale_coords(img.shape[2:], det[:, :4],im0.shape).round() #转换坐标格式,及tensor转换为cpu中的numpy格式。 for *xyxy, conf, cls in reversed(det): cls_c = cls.cpu().numpy() conf_c = conf.cpu().numpy() tt=[ int(x.cpu()) for x in xyxy] x0,y0,x1,y1 = tt[0:4] x0 = max(0,x0);y0 = max(0,y0); x1 = min(W-1,x1);y1 = min(H-1,y1) #line = [float(cls_c), *tt, float(conf_c)] # label format , line = [ x0,y0,x1,y1, float(conf_c),float(cls_c)] # label format 2023.08.03--修改 #print('###line305:',line) det_xywh.append(line) time2=time.time() strout='nms:%s scaleback:%s '%( get_ms(time0,time1),get_ms(time1,time2) ) return [im0,im0,det_xywh,0],strout ###0,没有意义,只是为了和过去保持一致长度4个元素。 def detectDraw(im0,dets,label_arraylist,rainbows,font): for det in dets: xyxy = det[1:5] cls = det[0]; conf = det[5] im0 = draw_painting_joint(xyxy,im0,label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],font=font) return im0 def preprocess(par): print('#####process:',par['name']) ##负责读取视频,生成原图及供检测的使用图,numpy格式 #source='rtmp://liveplay.yunhengzhizao.cn/live/demo_HD5M' #img_size=640; stride=32 while True: cap = cv2.VideoCapture(par['source']) iframe = 0 if cap.isOpened(): print( '#### read %s success!'%(par['source'])) try: dataset = LoadStreams(par['source'], img_size=640, stride=32) for path, img, im0s, vid_cap in dataset: datas=[path, img, im0s, vid_cap,iframe] par['queOut'].put(datas) iframe +=1 except Exception as e: print('###read error:%s '%(par['source'])) time.sleep(10) iframe = 0 else: print('###read error:%s '%(par['source'] )) time.sleep(10) iframe = 0 def gpu_process(par): print('#####process:',par['name']) half=True ##gpu运算,检测模型 weights = par['weights'] device = par['device'] print('###line127:',par['device']) model = attempt_load(par['weights'], map_location=par['device']) # load FP32 model if half: model.half() ##gpu运算,分割模型 seg_nclass = par['seg_nclass'] seg_weights = par['seg_weights'] #segmodel = SegModel(nclass=seg_nclass,weights=seg_weights,device=device) nclass = [2,2] Segmodel = BiSeNet_MultiOutput(nclass) weights='weights/segmentation/WaterBuilding.pth' segmodel = SegModel(model=Segmodel,nclass=nclass,weights=weights,device='cuda:0',multiOutput=True) while True: if not par['queIn'].empty(): time0=time.time() datas = par['queIn'].get() path, img, im0s, vid_cap,iframe = datas[0:5] time1=time.time() img = torch.from_numpy(img).to(device) img = img.half() if half else img.float() # uint8 to fp16/32 img /= 255.0 # 0 - 255 to 0.0 - 1.0 time2 = time.time() pred = model(img,augment=False)[0] time3 = time.time() seg_pred = segmodel.eval(im0s[0],outsize=None,smooth_kernel=20) time4 = time.time() fpStr= 'process:%s ,iframe:%d,getdata:%s,copygpu:%s,dettime:%s,segtime:%s , time:%s, queLen:%d '%( par['name'],iframe,get_ms(time0,time1) ,get_ms(time1,time2) ,get_ms(time2,time3) ,get_ms(time3,time4),get_ms(time0,time4) ,par['queIn'].qsize() ) #FP_DEBUG.write( fpStr+'\n' ) datasOut = [path, img, im0s, vid_cap,pred,seg_pred,iframe] par['queOut'].put(datasOut) if par['debug']: print('#####process:',par['name'],' line107') else: time.sleep(1/300) def get_cls(array): dcs = Counter(array) keys = list(dcs.keys()) values = list(dcs.values()) max_index = values.index(max(values)) cls = int(keys[max_index]) return cls def save_problem_images(post_results,iimage_cnt,names,streamName='live-THSAHD5M',outImaDir='problems/images_tmp',imageTxtFile=False): ## [cls, x,y,w,h, conf] problem_image=[[] for i in range(6)] dets_list = [x[2] for x in post_results] mean_scores=[ np.array(x)[:,5].mean() for x in dets_list ] ###mean conf best_index = mean_scores.index(max(mean_scores)) ##获取该批图片里,问题图片的index best_frame = post_results[ best_index][3] ##获取绝对帧号 img_send = post_results[best_index][1]##AI处理后的图 img_bak = post_results[best_index][0]##原图 cls_max = get_cls( x[5] for x in dets_list[best_index] ) time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) uid=''.join(random.sample(string.ascii_letters + string.digits, 16)) #ori_name = '2022-01-20-15-57-36_frame-368-720_type-漂浮物_qVh4zI08ZlwJN9on_s-live-THSAHD5M_OR.jpg' #2022-01-13-15-07-57_frame-9999-9999_type-结束_9999999999999999_s-off-XJRW20220110115904_AI.jpg outnameOR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_AI.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName) outnameAR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_OR.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName) cv2.imwrite(outnameOR,img_send) try: cv2.imwrite(outnameAR,img_bak) except: print(outnameAR,type(img_bak),img_bak.size()) if imageTxtFile: outnameOR_txt = outnameOR.replace('.jpg','.txt') fp=open(outnameOR_txt,'w');fp.write(outnameOR+'\n');fp.close() outnameAI_txt = outnameAR.replace('.jpg','.txt') fp=open(outnameAI_txt,'w');fp.write(outnameAR+'\n');fp.close() parOut = {}; parOut['imgOR'] = img_send; parOut['imgAR'] = img_send; parOut['uid']=uid parOut['imgORname']=os.path.basename(outnameOR);parOut['imgARname']=os.path.basename(outnameAR); parOut['time_str'] = time_str;parOut['type'] = names[cls_max] return parOut def post_process(par): print('#####process:',par['name']) ###post-process参数 conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes'] labelnames=par['labelnames'] rainbows=par['rainbows'] fpsample = par['fpsample'] names=get_labelnames(labelnames) label_arraylist = get_label_arrays(names,rainbows,outfontsize=40) iimage_cnt = 0 post_results=[] while True: if not par['queIn'].empty(): time0=time.time() datas = par['queIn'].get() iframe = datas[6] if par['debug']: print('#####process:',par['name'],' line129') p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe) par['queOut'].put(p_result) ##输出结果 ##每隔 fpsample帧处理一次,如果有问题就保存图片 if (iframe % fpsample == 0) and (len(post_results)>0) : #print('####line204:',iframe,post_results) save_problem_images(post_results,iframe,names) post_results=[] if len(p_result[2] )>0: ## #post_list = p_result.append(iframe) post_results.append(p_result) #print('####line201:',type(p_result)) time1=time.time() outstr='process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() ) #FP_DEBUG.write(outstr +'\n') #print( 'process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() ) ) else: time.sleep(1/300) def save_logfile(name,txt): if os.path.exists(name): fp=open(name,'r+') else: fp=open(name,'w') fp.write('%s %s \n'%(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),txt)) fp.close() def time_str(): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) if __name__=='__main__': jsonfile='config/queRiver.json' #image_encode_decode() work_stream(jsonfile) #par={'name':'preprocess'} #preprocess(par)