用kafka接收消息
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
13KB

  1. from kafka import KafkaProducer, KafkaConsumer
  2. from kafka.errors import kafka_errors
  3. import traceback
  4. import json, base64,os
  5. import numpy as np
  6. from multiprocessing import Process,Queue
  7. import time,cv2,string,random
  8. import subprocess as sp
  9. import matplotlib.pyplot as plt
  10. from utils.datasets import LoadStreams, LoadImages
  11. from models.experimental import attempt_load
  12. from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, apply_classifier, scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path
  13. import torch,sys
  14. #from segutils.segmodel import SegModel,get_largest_contours
  15. #sys.path.extend(['../yolov5/segutils'])
  16. from segutils.segWaterBuilding import SegModel,get_largest_contours,illBuildings
  17. #from segutils.core.models.bisenet import BiSeNet
  18. from segutils.core.models.bisenet import BiSeNet_MultiOutput
  19. from utils.plots import plot_one_box,plot_one_box_PIL,draw_painting_joint,get_label_arrays,get_websource
  20. from collections import Counter
  21. #import matplotlib
  22. import matplotlib.pyplot as plt
  23. # get_labelnames,get_label_arrays,post_process_,save_problem_images,time_str
  24. FP_DEBUG=open('debut.txt','w')
  25. def bsJpgCode(image_ori):
  26. jpgCode = cv2.imencode('.jpg',image_ori)[-1]###np.array,(4502009,1)
  27. bsCode = str(base64.b64encode(jpgCode))[2:-1] ###str,长6002680
  28. return bsCode
  29. def bsJpgDecode(bsCode):
  30. bsDecode = base64.b64decode(bsCode)###types,长4502009
  31. npString = np.frombuffer(bsDecode,np.uint8)###np.array,(长4502009,)
  32. jpgDecode = cv2.imdecode(npString,cv2.IMREAD_COLOR)###np.array,(3000,4000,3)
  33. return jpgDecode
  34. def get_ms(time0,time1):
  35. str_time ='%.2f ms'%((time1-time0)*1000)
  36. return str_time
  37. rainbows=[
  38. (0,0,255),(0,255,0),(255,0,0),(255,0,255),(255,255,0),(255,127,0),(255,0,127),
  39. (127,255,0),(0,255,127),(0,127,255),(127,0,255),(255,127,255),(255,255,127),
  40. (127,255,255),(0,255,255),(255,127,255),(127,255,255),
  41. (0,127,0),(0,0,127),(0,255,255)
  42. ]
  43. def get_labelnames(labelnames):
  44. with open(labelnames,'r') as fp:
  45. namesjson=json.load(fp)
  46. names_fromfile=namesjson['labelnames']
  47. names = names_fromfile
  48. return names
  49. def check_stream(stream):
  50. cap = cv2.VideoCapture(stream)
  51. if cap.isOpened():
  52. return True
  53. else:
  54. return False
  55. #####
  56. def drawWater(pred,image_array0):####pred是模型的输出,只有水分割的任务
  57. ##画出水体区域
  58. contours, hierarchy = cv2.findContours(pred,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
  59. water = pred.copy(); water[:,:] = 0
  60. if len(contours)==0:
  61. return image_array0,water
  62. max_id = get_largest_contours(contours);
  63. cv2.fillPoly(water, [contours[max_id][:,0,:]], 1)
  64. cv2.drawContours(image_array0,contours,max_id,(0,255,255),3)
  65. return image_array0,water
  66. def post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe):
  67. ##输入dataset genereate 生成的数据,model预测的结果pred,nms参数
  68. ##主要操作NMS ---> 坐标转换 ---> 画图
  69. ##输出原图、AI处理后的图、检测结果
  70. time0=time.time()
  71. path, img, im0s, vid_cap ,pred,seg_pred= datas[0:6];
  72. segmodel=True
  73. pred = non_max_suppression(pred, conf_thres, iou_thres, classes=None, agnostic=False)
  74. time1=time.time()
  75. i=0;det=pred[0]###一次检测一张图片
  76. p, s, im0 = path[i], '%g: ' % i, im0s[i].copy()
  77. gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
  78. det_xywh=[];
  79. #im0_brg=cv2.cvtColor(im0,cv2.COLOR_RGB2BGR);
  80. if len(seg_pred)==2:
  81. im0,water = illBuildings(seg_pred,im0)
  82. else:
  83. im0,water = drawWater(seg_pred,im0)
  84. time2=time.time()
  85. #plt.imshow(im0);plt.show()
  86. if len(det)>0:
  87. # Rescale boxes from img_size to im0 size
  88. det[:, :4] = scale_coords(img.shape[2:], det[:, :4],im0.shape).round()
  89. #用seg模型,确定有效检测匡及河道轮廓线
  90. if segmodel:
  91. '''contours, hierarchy = cv2.findContours(seg_pred,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
  92. if len(contours)>0:
  93. max_id = get_largest_contours(contours)
  94. seg_pred[:,:] = 0
  95. cv2.fillPoly(seg_pred, [contours[max_id][:,0,:]], 1)
  96. cv2.drawContours(im0,contours,max_id,(0,255,255),3)'''
  97. det_c = det.clone(); det_c=det_c.cpu().numpy()
  98. area_factors = np.array([np.sum(water[int(x[1]):int(x[3]), int(x[0]):int(x[2])] )/((x[2]-x[0])*(x[3]-x[1])) for x in det_c] )
  99. det = det[area_factors>0.1]
  100. #对检测匡绘图
  101. for *xyxy, conf, cls in reversed(det):
  102. xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
  103. cls_c = cls.cpu().numpy()
  104. conf_c = conf.cpu().numpy()
  105. line = [float(cls_c), *xywh, float(conf_c)] # label format
  106. det_xywh.append(line)
  107. label = f'{names[int(cls)]} {conf:.2f}'
  108. im0 = draw_painting_joint(xyxy,im0,label_arraylist[int(cls)],score=conf,color=rainbows[int(cls)%20],line_thickness=None)
  109. time3=time.time()
  110. strout='nms:%s illBuilding:%s detDraw:%s '%(get_ms(time0,time1),get_ms(time1,time2), get_ms(time2,time3) )
  111. return [im0s[0],im0,det_xywh,iframe],strout
  112. def preprocess(par):
  113. print('#####process:',par['name'])
  114. ##负责读取视频,生成原图及供检测的使用图,numpy格式
  115. #source='rtmp://liveplay.yunhengzhizao.cn/live/demo_HD5M'
  116. #img_size=640; stride=32
  117. while True:
  118. cap = cv2.VideoCapture(par['source'])
  119. iframe = 0
  120. if cap.isOpened():
  121. print( '#### read %s success!'%(par['source']))
  122. try:
  123. dataset = LoadStreams(par['source'], img_size=640, stride=32)
  124. for path, img, im0s, vid_cap in dataset:
  125. datas=[path, img, im0s, vid_cap,iframe]
  126. par['queOut'].put(datas)
  127. iframe +=1
  128. except Exception as e:
  129. print('###read error:%s '%(par['source']))
  130. time.sleep(10)
  131. iframe = 0
  132. else:
  133. print('###read error:%s '%(par['source'] ))
  134. time.sleep(10)
  135. iframe = 0
  136. def gpu_process(par):
  137. print('#####process:',par['name'])
  138. half=True
  139. ##gpu运算,检测模型
  140. weights = par['weights']
  141. device = par['device']
  142. print('###line127:',par['device'])
  143. model = attempt_load(par['weights'], map_location=par['device']) # load FP32 model
  144. if half:
  145. model.half()
  146. ##gpu运算,分割模型
  147. seg_nclass = par['seg_nclass']
  148. seg_weights = par['seg_weights']
  149. #segmodel = SegModel(nclass=seg_nclass,weights=seg_weights,device=device)
  150. nclass = [2,2]
  151. Segmodel = BiSeNet_MultiOutput(nclass)
  152. weights='weights/segmentation/WaterBuilding.pth'
  153. segmodel = SegModel(model=Segmodel,nclass=nclass,weights=weights,device='cuda:0',multiOutput=True)
  154. while True:
  155. if not par['queIn'].empty():
  156. time0=time.time()
  157. datas = par['queIn'].get()
  158. path, img, im0s, vid_cap,iframe = datas[0:5]
  159. time1=time.time()
  160. img = torch.from_numpy(img).to(device)
  161. img = img.half() if half else img.float() # uint8 to fp16/32
  162. img /= 255.0 # 0 - 255 to 0.0 - 1.0
  163. time2 = time.time()
  164. pred = model(img,augment=False)[0]
  165. time3 = time.time()
  166. seg_pred = segmodel.eval(im0s[0],outsize=None,smooth_kernel=20)
  167. time4 = time.time()
  168. fpStr= 'process:%s ,iframe:%d,getdata:%s,copygpu:%s,dettime:%s,segtime:%s , time:%s, queLen:%d '%( par['name'],iframe,get_ms(time0,time1) ,get_ms(time1,time2) ,get_ms(time2,time3) ,get_ms(time3,time4),get_ms(time0,time4) ,par['queIn'].qsize() )
  169. FP_DEBUG.write( fpStr+'\n' )
  170. datasOut = [path, img, im0s, vid_cap,pred,seg_pred,iframe]
  171. par['queOut'].put(datasOut)
  172. if par['debug']:
  173. print('#####process:',par['name'],' line107')
  174. else:
  175. time.sleep(1/300)
  176. def get_cls(array):
  177. dcs = Counter(array)
  178. keys = list(dcs.keys())
  179. values = list(dcs.values())
  180. max_index = values.index(max(values))
  181. cls = int(keys[max_index])
  182. return cls
  183. def save_problem_images(post_results,iimage_cnt,names,streamName='live-THSAHD5M',outImaDir='problems/images_tmp',imageTxtFile=False):
  184. ## [cls, x,y,w,h, conf]
  185. problem_image=[[] for i in range(6)]
  186. dets_list = [x[2] for x in post_results]
  187. mean_scores=[ np.array(x)[:,5].mean() for x in dets_list ] ###mean conf
  188. best_index = mean_scores.index(max(mean_scores)) ##获取该批图片里,问题图片的index
  189. best_frame = post_results[ best_index][3] ##获取绝对帧号
  190. img_send = post_results[best_index][1]##AI处理后的图
  191. img_bak = post_results[best_index][0]##原图
  192. cls_max = get_cls( x[0] for x in dets_list[best_index] )
  193. time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
  194. uid=''.join(random.sample(string.ascii_letters + string.digits, 16))
  195. #ori_name = '2022-01-20-15-57-36_frame-368-720_type-漂浮物_qVh4zI08ZlwJN9on_s-live-THSAHD5M_OR.jpg'
  196. #2022-01-13-15-07-57_frame-9999-9999_type-结束_9999999999999999_s-off-XJRW20220110115904_AI.jpg
  197. outnameOR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_AI.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName)
  198. outnameAR= '%s/%s_frame-%d-%d_type-%s_%s_s-%s_OR.jpg'%(outImaDir,time_str,best_frame,iimage_cnt,names[cls_max],uid,streamName)
  199. cv2.imwrite(outnameOR,img_send)
  200. cv2.imwrite(outnameAR,img_bak)
  201. if imageTxtFile:
  202. outnameOR_txt = outnameOR.replace('.jpg','.txt')
  203. fp=open(outnameOR_txt,'w');fp.write(outnameOR+'\n');fp.close()
  204. outnameAI_txt = outnameAR.replace('.jpg','.txt')
  205. fp=open(outnameAI_txt,'w');fp.write(outnameAR+'\n');fp.close()
  206. parOut = {}; parOut['imgOR'] = img_send; parOut['imgAR'] = img_send; parOut['uid']=uid
  207. parOut['imgORname']=os.path.basename(outnameOR);parOut['imgARname']=os.path.basename(outnameAR);
  208. parOut['time_str'] = time_str;parOut['type'] = names[cls_max]
  209. return parOut
  210. def post_process(par):
  211. print('#####process:',par['name'])
  212. ###post-process参数
  213. conf_thres,iou_thres,classes=par['conf_thres'],par['iou_thres'],par['classes']
  214. labelnames=par['labelnames']
  215. rainbows=par['rainbows']
  216. fpsample = par['fpsample']
  217. names=get_labelnames(labelnames)
  218. label_arraylist = get_label_arrays(names,rainbows,outfontsize=40)
  219. iimage_cnt = 0
  220. post_results=[]
  221. while True:
  222. if not par['queIn'].empty():
  223. time0=time.time()
  224. datas = par['queIn'].get()
  225. iframe = datas[6]
  226. if par['debug']:
  227. print('#####process:',par['name'],' line129')
  228. p_result,timeOut = post_process_(datas,conf_thres, iou_thres,names,label_arraylist,rainbows,iframe)
  229. par['queOut'].put(p_result)
  230. ##输出结果
  231. ##每隔 fpsample帧处理一次,如果有问题就保存图片
  232. if (iframe % fpsample == 0) and (len(post_results)>0) :
  233. #print('####line204:',iframe,post_results)
  234. save_problem_images(post_results,iframe,names)
  235. post_results=[]
  236. if len(p_result[2] )>0: ##
  237. #post_list = p_result.append(iframe)
  238. post_results.append(p_result)
  239. #print('####line201:',type(p_result))
  240. time1=time.time()
  241. outstr='process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() )
  242. FP_DEBUG.write(outstr +'\n')
  243. #print( 'process:%s ,iframe:%d,%s , time:%s, queLen:%d '%( par['name'],iframe,timeOut,get_ms(time0,time1) ,par['queIn'].qsize() ) )
  244. else:
  245. time.sleep(1/300)
  246. def save_logfile(name,txt):
  247. if os.path.exists(name):
  248. fp=open(name,'r+')
  249. else:
  250. fp=open(name,'w')
  251. fp.write('%s %s \n'%(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),txt))
  252. fp.close()
  253. def time_str():
  254. return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  255. if __name__=='__main__':
  256. jsonfile='config/queRiver.json'
  257. #image_encode_decode()
  258. work_stream(jsonfile)
  259. #par={'name':'preprocess'}
  260. #preprocess(par)