Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

347 lines
15KB

  1. from kafka import KafkaProducer, KafkaConsumer
  2. from kafka.errors import kafka_errors
  3. import traceback
  4. import json, base64,os
  5. import numpy as np
  6. from multiprocessing import Process,Queue
  7. import time,cv2,string,random
  8. import subprocess as sp
  9. import matplotlib.pyplot as plt
  10. from utils.datasets import LoadStreams, LoadImages
  11. from models.experimental import attempt_load
  12. from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, apply_classifier, scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path
  13. import torch,sys
  14. #from segutils.segmodel import SegModel,get_largest_contours
  15. #sys.path.extend(['../yolov5/segutils'])
  16. from segutils.segWaterBuilding import SegModel,get_largest_contours,illBuildings
  17. #from segutils.core.models.bisenet import BiSeNet
  18. from segutils.core.models.bisenet import BiSeNet_MultiOutput
  19. from utils.plots import plot_one_box,plot_one_box_PIL,draw_painting_joint,get_label_arrays,get_websource
  20. from collections import Counter
  21. #import matplotlib
  22. import matplotlib.pyplot as plt
  23. # get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
  24. #FP_DEBUG=open('debut.txt','w')
  25. def bsJpgCode(image_ori):
  26. jpgCode = cv2.imencode('.jpg',image_ori)[-1]###np.array,(4502009,1)
  27. bsCode = str(base64.b64encode(jpgCode))[2:-1] ###str,长6002680
  28. return bsCode
  29. def bsJpgDecode(bsCode):
  30. bsDecode = base64.b64decode(bsCode)###types,长4502009
  31. npString = np.frombuffer(bsDecode,np.uint8)###np.array,(长4502009,)
  32. jpgDecode = cv2.imdecode(npString,cv2.IMREAD_COLOR)###np.array,(3000,4000,3)
  33. return jpgDecode
  34. def get_ms(time0,time1):
  35. str_time ='%.2f ms'%((time1-time0)*1000)
  36. return str_time
  37. rainbows=[
  38. (0,0,255),(0,255,0),(255,0,0),(255,0,255),(255,255,0),(255,127,0),(255,0,127),
  39. (127,255,0),(0,255,127),(0,127,255),(127,0,255),(255,127,255),(255,255,127),
  40. (127,255,255),(0,255,255),(255,127,255),(127,255,255),
  41. (0,127,0),(0,0,127),(0,255,255)
  42. ]
  43. def get_labelnames(labelnames):
  44. with open(labelnames,'r') as fp:
  45. namesjson=json.load(fp)
  46. names_fromfile=namesjson['labelnames']
  47. names = names_fromfile
  48. return names
  49. def check_stream(stream):
  50. cap = cv2.VideoCapture(stream)
  51. if cap.isOpened():
  52. return True
  53. else:
  54. return False
  55. #####
  56. def drawWater(pred,image_array0,river={'color':(0,255,255),'line_width':3,'segRegionCnt':2}):####pred是模型的输出,只有水分割的任务
  57. ##画出水体区域
  58. contours, hierarchy = cv2.findContours(pred,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
  59. water = pred.copy(); water[:,:] = 0
  60. if len(contours)==0:
  61. return image_array0,water
  62. max_ids = get_largest_contours(contours,river['segRegionCnt']);
  63. for max_id in max_ids:
  64. cv2.fillPoly(water, [contours[max_id][:,0,:]], 1)
  65. cv2.drawContours(image_array0,contours,max_id,river['color'],river['line_width'] )
  66. return image_array0,water
  67. def scale_back(boxes,padInfos):
  68. top, left,r = padInfos[0:3]
  69. boxes[:,0] = (boxes[:,0] - left) * r
  70. boxes[:,2] = (boxes[:,2] - left) * r
  71. boxes[:,1] = (boxes[:,1] - top) * r
  72. boxes[:,3] = (boxes[:,3] - top) * r
  73. return boxes
  74. def img_pad(img, size, pad_value=[114,114,114]):
  75. ###填充成固定尺寸
  76. H,W,_ = img.shape
  77. r = max(H/size[0], W/size[1])
  78. img_r = cv2.resize(img, (int(W/r), int(H/r)))
  79. tb = size[0] - img_r.shape[0]
  80. lr = size[1] - img_r.shape[1]
  81. top = int(tb/2)
  82. bottom = tb - top
  83. left = int(lr/2)
  84. right = lr - left
  85. pad_image = cv2.copyMakeBorder(img_r, top, bottom, left, right, cv2.BORDER_CONSTANT,value=pad_value)
  86. return pad_image,(top, left,r)
  87. def post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe,ObjectPar={ 'object_config':[0,1,2,3,4], 'slopeIndex':[5,6,7] ,'segmodel':True,'segRegionCnt':1 },font={ 'line_thickness':None, 'fontSize':None,'boxLine_thickness':None,'waterLineColor':(0,255,255),'waterLineWidth':3},padInfos=None ):
  88. object_config,slopeIndex,segmodel,segRegionCnt=ObjectPar['object_config'],ObjectPar['slopeIndex'],ObjectPar['segmodel'],ObjectPar['segRegionCnt']
  89. ##输入dataset genereate 生成的数据,model预测的结果pred,nms参数
  90. ##主要操作NMS ---> 坐标转换 ---> 画图
  91. ##输出原图、AI处理后的图、检测结果
  92. time0=time.time()
  93. path, img, im0s, vid_cap ,pred,seg_pred= datas[0:6];
  94. #segmodel=True
  95. pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False)
  96. time1=time.time()
  97. i=0;det=pred[0]###一次检测一张图片
  98. time1_1 = time.time()
  99. #p, s, im0 = path[i], '%g: ' % i, im0s[i].copy()
  100. p, s, im0 = path[i], '%g: ' % i, im0s[i]
  101. time1_2 = time.time()
  102. gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
  103. time1_3 = time.time()
  104. det_xywh=[];
  105. #im0_brg=cv2.cvtColor(im0,cv2.COLOR_RGB2BGR);
  106. if segmodel:
  107. if len(seg_pred)==2:
  108. im0,water = illBuildings(seg_pred,im0)
  109. else:
  110. river={ 'color':font['waterLineColor'],'line_width':font['waterLineWidth'],'segRegionCnt':segRegionCnt }
  111. im0,water = drawWater(seg_pred,im0,river)
  112. time2=time.time()
  113. #plt.imshow(im0);plt.show()
  114. if len(det)>0:
  115. # Rescale boxes from img_size to im0 size
  116. if not padInfos:
  117. det[:, :4] = scale_coords(img.shape[2:], det[:, :4],im0.shape).round()
  118. else:
  119. #print('####line131:',det[:, :])
  120. det[:, :4] = scale_back( det[:, :4],padInfos).round()
  121. #print('####line133:',det[:, :])
  122. #用seg模型,确定有效检测匡及河道轮廓线
  123. if segmodel:
  124. cls_indexs = det[:, 5].clone().cpu().numpy().astype(np.int32)
  125. ##判断哪些目标属于岸坡的
  126. slope_flag = np.array([x in slopeIndex for x in cls_indexs ] )
  127. det_c = det.clone(); det_c=det_c.cpu().numpy()
  128. try:
  129. area_factors = np.array([np.sum(water[int(x[1]):int(x[3]), int(x[0]):int(x[2])] )*1.0/(1.0*(x[2]-x[0])*(x[3]-x[1])+0.00001) for x in det_c] )
  130. except:
  131. print('*****************************line143: error:',det_c)
  132. water_flag = np.array(area_factors>0.1)
  133. det = det[water_flag|slope_flag]##如果是水上目标,则需要与水的iou超过0.1;如果是岸坡目标,则直接保留。
  134. #对检测匡绘图
  135. for *xyxy, conf, cls in reversed(det):
  136. xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
  137. cls_c = cls.cpu().numpy()
  138. if int(cls_c) not in object_config: ###如果不是所需要的目标,则不显示
  139. continue
  140. conf_c = conf.cpu().numpy()
  141. line = [float(cls_c), *xywh, float(conf_c)] # label format
  142. det_xywh.append(line)
  143. label = f'{names[int(cls)]} {conf:.2f}'
  144. im0 = draw_painting_joint(xyxy,im0,label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],font=font)
  145. time3=time.time()
  146. strout='nms:%s drawWater:%s,copy:%s,toTensor:%s,detDraw:%s '%(get_ms(time0,time1),get_ms(time1,time2),get_ms(time1_1,time1_2),get_ms(time1_2,time1_3), get_ms(time2,time3) )
  147. return [im0s[0],im0,det_xywh,iframe],strout
  148. def preprocess(par):
  149. print('#####process:',par['name'])
  150. ##负责读取视频,生成原图及供检测的使用图,numpy格式
  151. #source='rtmp://liveplay.yunhengzhizao.cn/live/demo_HD5M'
  152. #img_size=640; stride=32
  153. while True:
  154. cap = cv2.VideoCapture(par['source'])
  155. iframe = 0
  156. if cap.isOpened():
  157. print( '#### read %s success!'%(par['source']))
  158. try:
  159. dataset = LoadStreams(par['source'], img_size=640, stride=32)
  160. for path, img, im0s, vid_cap in dataset:
  161. datas=[path, img, im0s, vid_cap,iframe]
  162. par['queOut'].put(datas)
  163. iframe +=1
  164. except Exception as e:
  165. print('###read error:%s '%(par['source']))
  166. time.sleep(10)
  167. iframe = 0
  168. else:
  169. print('###read error:%s '%(par['source'] ))
  170. time.sleep(10)
  171. iframe = 0
  172. def gpu_process(par):
  173. print('#####process:',par['name'])
  174. half=True
  175. ##gpu运算,检测模型
  176. weights = par['weights']
  177. device = par['device']
  178. print('###line127:',par['device'])
  179. model = attempt_load(par['weights'], map_location=par['device']) # load FP32 model
  180. if half:
  181. model.half()
  182. ##gpu运算,分割模型
  183. seg_nclass = par['seg_nclass']
  184. seg_weights = par['seg_weights']
  185. #segmodel = SegModel(nclass=seg_nclass,weights=seg_weights,device=device)
  186. nclass = [2,2]
  187. Segmodel = BiSeNet_MultiOutput(nclass)
  188. weights='weights/segmentation/WaterBuilding.pth'
  189. segmodel = SegModel(model=Segmodel,nclass=nclass,weights=weights,device='cuda:0',multiOutput=True)
  190. while True:
  191. if not par['queIn'].empty():
  192. time0=time.time()
  193. datas = par['queIn'].get()
  194. path, img, im0s, vid_cap,iframe = datas[0:5]
  195. time1=time.time()
  196. img = torch.from_numpy(img).to(device)
  197. img = img.half() if half else img.float() # uint8 to fp16/32
  198. img /= 255.0 # 0 - 255 to 0.0 - 1.0
  199. time2 = time.time()
  200. pred = model(img,augment=False)[0]
  201. time3 = time.time()
  202. seg_pred = segmodel.eval(im0s[0],outsize=None,smooth_kernel=20)
  203. time4 = time.time()
  204. fpStr= 'process:%s ,iframe:%d,getdata:%s,copygpu:%s,dettime:%s,segtime:%s , time:%s, queLen:%d '%( par['name'],iframe,get_ms(time0,time1) ,get_ms(time1,time2) ,get_ms(time2,time3) ,get_ms(time3,time4),get_ms(time0,time4) ,par['queIn'].qsize() )
  205. #FP_DEBUG.write( fpStr+'\n' )
  206. datasOut = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
  207. par['queOut'].put(datasOut)
  208. if par['debug']:
  209. print('#####process:',par['name'],' line107')
  210. else:
  211. time.sleep(1/300)
  212. def get_cls(array):
  213. dcs = Counter(array)
  214. keys = list(dcs.keys())
  215. values = list(dcs.values())
  216. max_index = values.index(max(values))
  217. cls = int(keys[max_index])
  218. return cls
  219. def save_problem_images(post_results,iimage_cnt,names,streamName='live-THSAHD5M',outImaDir='problems/images_tmp',imageTxtFile=False):
  220. ## [cls, x,y,w,h, conf]
  221. problem_image=[[] for i in range(6)]
  222. dets_list = [x[2] for x in post_results]
  223. mean_scores=[ np.array(x)[:,5].mean() for x in dets_list ] ###mean conf
  224. best_index = mean_scores.index(max(mean_scores)) ##获取该批图片里,问题图片的index
  225. best_frame = post_results[ best_index][3] ##获取绝对帧号
  226. img_send = post_results[best_index][1]##AI处理后的图
  227. img_bak = post_results[best_index][0]##原图
  228. cls_max = get_cls( x[0] for x in dets_list[best_index] )
  229. time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  230. uid=''.join(random.sample(string.ascii_letters + string.digits, 16))
  231. #ori_name = '2022-01-20-15-57-36_frame-368-720_type-漂浮物_qVh4zI08ZlwJN9on_s-live-THSAHD5M_OR.jpg'
  232. #2022-01-13-15-07-57_frame-9999-9999_type-结束_9999999999999999_s-off-XJRW20220110115904_AI.jpg
  233. outnameOR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_AI.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName)
  234. outnameAR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_OR.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName)
  235. cv2.imwrite(outnameOR,img_send)
  236. cv2.imwrite(outnameAR,img_bak)
  237. if imageTxtFile:
  238. outnameOR_txt = outnameOR.replace('.jpg','.txt')
  239. fp=open(outnameOR_txt,'w');fp.write(outnameOR+'\n');fp.close()
  240. outnameAI_txt = outnameAR.replace('.jpg','.txt')
  241. fp=open(outnameAI_txt,'w');fp.write(outnameAR+'\n');fp.close()
  242. parOut = {}; parOut['imgOR'] = img_send; parOut['imgAR'] = img_send; parOut['uid']=uid
  243. parOut['imgORname']=os.path.basename(outnameOR);parOut['imgARname']=os.path.basename(outnameAR);
  244. parOut['time_str'] = time_str;parOut['type'] = names[cls_max]
  245. return parOut
  246. def post_process(par):
  247. print('#####process:',par['name'])
  248. ###post-process参数
  249. conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
  250. labelnames=par['labelnames']
  251. rainbows=par['rainbows']
  252. fpsample = par['fpsample']
  253. names=get_labelnames(labelnames)
  254. label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
  255. iimage_cnt = 0
  256. post_results=[]
  257. while True:
  258. if not par['queIn'].empty():
  259. time0=time.time()
  260. datas = par['queIn'].get()
  261. iframe = datas[6]
  262. if par['debug']:
  263. print('#####process:',par['name'],' line129')
  264. p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
  265. par['queOut'].put(p_result)
  266. ##输出结果
  267. ##每隔 fpsample帧处理一次,如果有问题就保存图片
  268. if (iframe % fpsample == 0) and (len(post_results)>0) :
  269. #print('####line204:',iframe,post_results)
  270. save_problem_images(post_results,iframe,names)
  271. post_results=[]
  272. if len(p_result[2] )>0: ##
  273. #post_list = p_result.append(iframe)
  274. post_results.append(p_result)
  275. #print('####line201:',type(p_result))
  276. time1=time.time()
  277. outstr='process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() )
  278. #FP_DEBUG.write(outstr +'\n')
  279. #print( 'process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() ) )
  280. else:
  281. time.sleep(1/300)
  282. def save_logfile(name,txt):
  283. if os.path.exists(name):
  284. fp=open(name,'r+')
  285. else:
  286. fp=open(name,'w')
  287. fp.write('%s %s \n'%(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),txt))
  288. fp.close()
  289. def time_str():
  290. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  291. if __name__=='__main__':
  292. jsonfile='config/queRiver.json'
  293. #image_encode_decode()
  294. work_stream(jsonfile)
  295. #par={'name':'preprocess'}
  296. #preprocess(par)