You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 satır
11KB

  1. from kafka import KafkaProducer, KafkaConsumer,TopicPartition
  2. from kafka.errors import kafka_errors
  3. import os,cv2,sys,json,time
  4. import numpy as np
  5. import requests
  6. def query_channel_status(channelIndex):
  7. channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
  8. #https://streaming.t-aaron.com/livechannel/getLiveStatus/LC001
  9. try:
  10. res = requests.get(channel_query_api,timeout=10).json()
  11. if res['data']['status']==2:#1空闲中 2使用中 3停用 4待关闭
  12. taskEnd=False
  13. else:
  14. taskEnd=True
  15. infos='channel_query_api connected'
  16. except Exception as e:
  17. taskEnd=True
  18. infos='channel_query_api not connected:%s'%(e)
  19. return infos, taskEnd
  20. def query_request_status(request_url):
  21. #channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
  22. channel_request_api=request_url
  23. try:
  24. res = requests.get(channel_request_api,timeout=10).json()
  25. if res['data']['status']==5:#5:执行中 10:待停止分析 15:执行结束
  26. taskEnd=False
  27. else:
  28. taskEnd=True
  29. infos='channel_request_api connected'
  30. except Exception as e:
  31. taskEnd=True
  32. infos='channel_request_api not connected:%s'%(e)
  33. return infos, taskEnd
  34. def get_needed_objectsIndex(object_config):
  35. needed_objectsIndex=[]
  36. for model in object_config:
  37. try:
  38. needed_objectsIndex.append(int(model['id']))
  39. except Exception as e:
  40. a=1
  41. allowedList_str=[str(x) for x in needed_objectsIndex]
  42. allowedList_string=','.join(allowedList_str)
  43. return needed_objectsIndex , allowedList_string
  44. def get_infos(taskId, msgId,msg_h,key_str='waiting stream or video, send heartbeat'):
  45. outStrList={}
  46. outStrList['success']= '%s, taskId:%s msgId:%s send:%s'%(key_str,taskId, msgId,msg_h);
  47. outStrList['failure']='kafka ERROR, %s'%(key_str)
  48. outStrList['Refailure']='kafka Re-send ERROR ,%s'%(key_str)
  49. return outStrList
  50. def writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束'):
  51. #time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  52. EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_AI.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
  53. EndUrl = EndUrl.replace(' ','-').replace(':','-')
  54. img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
  55. if imageTxtFile:
  56. EndUrl_txt = EndUrl.replace('.jpg','.txt')
  57. fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
  58. EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_OR.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
  59. EndUrl = EndUrl.replace(' ','-').replace(':','-')
  60. ret = cv2.imwrite(EndUrl,img_end)
  61. if imageTxtFile:
  62. EndUrl_txt = EndUrl.replace('.jpg','.txt')
  63. fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
  64. def get_current_time():
  65. """[summary] 获取当前时间
  66. [description] 用time.localtime()+time.strftime()实现
  67. :returns: [description] 返回str类型
  68. """
  69. ct = time.time()
  70. local_time = time.localtime(ct)
  71. data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
  72. data_secs = (ct - int(ct)) * 1000
  73. time_stamp = "%s.%03d" % (data_head, data_secs)
  74. return time_stamp
  75. def send_kafka(producer,par,msg,outStrList,fp_log,logger,line='000',thread='detector',printFlag=False ):
  76. future = producer.send(par['topic'], msg)
  77. try:
  78. record_metadata = future.get()
  79. outstr=outStrList['success']
  80. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  81. writeELK_log(outstr,fp_log,level='INFO',thread=thread,line=line,logger=logger,printFlag=printFlag)
  82. except Exception as e:
  83. outstr='%s , warning: %s'%( outStrList['failure'],str(e))
  84. writeELK_log(outstr,fp_log,level='WARNING',thread=thread,line=line,logger=logger,printFlag=printFlag)
  85. try:
  86. producer.close()
  87. producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
  88. future = producer.send(par['topic'], msg).get()
  89. except Exception as e:
  90. outstr='%s, error: %s'%( outStrList['Refailure'],str(e))
  91. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  92. writeELK_log(outstr,fp_log,level='ERROR',thread=thread,line=line,logger=logger,printFlag=printFlag)
  93. def check_time_interval(time0_beg,time_interval):
  94. time_2 = time.time()
  95. if time_2 - time0_beg>time_interval:
  96. return time_2,True
  97. else:
  98. return time0_beg,False
  99. def addTime(strs):
  100. timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  101. outstr='\n %s %s '%(timestr,strs)
  102. return
  103. def get_file():
  104. print("文件名 :",__file__,sys._getframe().f_lineno)
  105. print("函数名: ", sys._getframe().f_code.co_name)
  106. print("模块名: ", sys._getframe().f_back.f_code.co_name)
  107. def writeELK_log(msg,fp,level='INFO',thread='detector',logger='kafka_yolov5',line=9999,newLine=False,printFlag=True):
  108. #timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  109. timestr=get_current_time()
  110. outstr='%s [%s][%s][%d][%s]- %s'%(timestr,level,thread,line,logger,msg)
  111. if newLine:
  112. outstr = '\n'+outstr
  113. fp.write(outstr+'\n')
  114. fp.flush()
  115. if printFlag:
  116. print(outstr)
  117. return outstr
  118. def wrtiteLog(fp,strs,newLine=False):
  119. timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  120. if newLine:
  121. outstr='\n %s %s '%(timestr,strs)
  122. else:
  123. outstr='%s %s '%(timestr,strs)
  124. fp.write(outstr+'\n')
  125. fp.flush()
  126. return outstr
  127. def create_logFile(logdir='logdir',name=None):
  128. if name:
  129. logname =logdir+'/'+ name
  130. else:
  131. logname =logdir+'/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
  132. if os.path.exists(logname):
  133. fp_log = open(logname,'a+')
  134. else:
  135. fp_log = open(logname,'w')
  136. return fp_log
  137. def get_boradcast_address(outResource):
  138. #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945
  139. #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935
  140. if '1945' in outResource:
  141. return 'rtmp://live.play.t-aaron.com/live/THSB'
  142. else:
  143. return 'rtmp://demoplay.yunhengzhizao.cn/live/THSB_HD5M'
  144. def save_message(kafka_dir,msg):
  145. outtxt=os.path.join(kafka_dir,msg['request_id']+'.json')
  146. assert os.path.exists(kafka_dir)
  147. with open(outtxt,'w') as fp:
  148. json.dump(msg,fp,ensure_ascii=False)
  149. def get_push_address(outResource):
  150. #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945
  151. #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935
  152. #终端推流地址:rtmp://live.push.t-aaron.com/live/THSAa
  153. #终端拉流地址:rtmp://live.play.t-aaron.com/live/THSAa_hd
  154. #AI推流地址:rtmp://live.push.t-aaron.com/live/THSBa
  155. #AI拉流地址:rtmp://live.play.t-aaron.com/live/THSBa_hd
  156. if 't-aaron' in outResource:
  157. if 'THSBa' in outResource: port=1975
  158. elif 'THSBb' in outResource: port=1991
  159. elif 'THSBc' in outResource: port=1992
  160. elif 'THSBd' in outResource: port=1993
  161. elif 'THSBe' in outResource: port=1994
  162. elif 'THSBf' in outResource: port=1995
  163. elif 'THSBg' in outResource: port=1996
  164. elif 'THSBh' in outResource: port=1997
  165. else: port=1945
  166. else: port=1935
  167. return 'rtmp://127.0.0.1:%d/live/test'%(port)
  168. return outResource
  169. def getAllRecord_poll(consumer):
  170. msgs = consumer.poll(5000)
  171. keys=msgs.keys()
  172. out = [ msgs[x] for x in keys]
  173. out = [y for x in out for y in x]
  174. for key in keys:
  175. out.extend(msgs[key])
  176. return out
  177. def getAllRecords(consumer,topics):
  178. leftCnt = 0
  179. for topic in topics[0:2]:
  180. leftCnt+=get_left_cnt(consumer,topic)
  181. out = []
  182. if leftCnt == 0:
  183. return []
  184. for ii,msg in enumerate(consumer):
  185. consumer.commit()
  186. out.append(msg)
  187. if ii== (leftCnt-1):
  188. break###断流或者到终点
  189. return out
  190. def get_left_cnt(consumer,topic):
  191. partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
  192. # total
  193. toff = consumer.end_offsets(partitions)
  194. toff = [(key.partition, toff[key]) for key in toff.keys()]
  195. toff.sort()
  196. # current
  197. coff = [(x.partition, consumer.committed(x)) for x in partitions]
  198. coff.sort()
  199. # cal sum and left
  200. toff_sum = sum([x[1] for x in toff])
  201. cur_sum = sum([x[1] for x in coff if x[1] is not None])
  202. left_sum = toff_sum - cur_sum
  203. return left_sum
  204. def view_bar(num, total,time1,prefix='prefix'):
  205. rate = num / total
  206. time_n=time.time()
  207. rate_num = int(rate * 30)
  208. rate_nums = np.round(rate * 100)
  209. r = '\r %s %d / %d [%s%s] %.2f s'%(prefix,num,total, ">" * rate_num, " " * (30 - rate_num), time_n-time1 )
  210. sys.stdout.write(r)
  211. sys.stdout.flush()
  212. def get_total_cnt(inSource):
  213. cap=cv2.VideoCapture(inSource)
  214. assert cap.isOpened()
  215. cnt=cap.get(7)
  216. fps = cap.get(cv2.CAP_PROP_FPS)
  217. cap.release()
  218. return cnt,fps
  219. def check_stream(inSource,producer,par,msg,outStrList ,fp_log,logger,line='000',thread='detector',timeMs=120,):
  220. cnt =(timeMs-1)//10 + 1
  221. Stream_ok=False
  222. for icap in range(cnt):
  223. cap=cv2.VideoCapture(inSource)
  224. if cap.isOpened() and get_fps_rtmp(inSource,video=False)[0] :
  225. Stream_ok=True ;cap.release();break;
  226. #Stream_ok,_= get_fps_rtmp(inSource,video=False)
  227. #if Stream_ok:cap.release();break;
  228. else:
  229. Stream_ok=False
  230. timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
  231. outstr='Waiting stream %d s'%(10*icap)
  232. writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=line,logger=logger)
  233. time.sleep(10)
  234. if icap%3==0:
  235. send_kafka(producer,par,msg,outStrList,fp_log,logger=logger,line=line,thread=thread )
  236. return Stream_ok
  237. def get_fps_rtmp(inSource,video=False):
  238. cap=cv2.VideoCapture(inSource)
  239. if not cap.isOpened():
  240. print('#####error url:',inSource)
  241. return False,[0,0,0,0]
  242. fps = cap.get(cv2.CAP_PROP_FPS)
  243. width = cap.get(cv2.CAP_PROP_FRAME_WIDTH )
  244. height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
  245. cnt = 0
  246. if video: cnt=cap.get(7)
  247. if width*height==0 or fps>30:
  248. return False,[0,0,0,0]
  249. cap.release()
  250. try:
  251. outx = [fps,width,height,cnt]
  252. outx = [int(x+0.5) for x in outx]
  253. return True,outx
  254. except:
  255. return False, [0,0,0,0]