from kafka import KafkaProducer, KafkaConsumer,TopicPartition from kafka.errors import kafka_errors import os,cv2,sys,json,time import numpy as np import requests def query_channel_status(channelIndex): channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex) #https://streaming.t-aaron.com/livechannel/getLiveStatus/LC001 try: res = requests.get(channel_query_api,timeout=10).json() if res['data']['status']==2:#1空闲中 2使用中 3停用 4待关闭 taskEnd=False else: taskEnd=True infos='channel_query_api connected' except Exception as e: taskEnd=True infos='channel_query_api not connected:%s'%(e) return infos, taskEnd def query_request_status(request_url): #channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex) channel_request_api=request_url try: res = requests.get(channel_request_api,timeout=10).json() if res['data']['status']==5:#5:执行中 10:待停止分析 15:执行结束 taskEnd=False else: taskEnd=True infos='channel_request_api connected' except Exception as e: taskEnd=True infos='channel_request_api not connected:%s'%(e) return infos, taskEnd def get_needed_objectsIndex(object_config): needed_objectsIndex=[] for model in object_config: try: needed_objectsIndex.append(int(model['id'])) except Exception as e: a=1 allowedList_str=[str(x) for x in needed_objectsIndex] allowedList_string=','.join(allowedList_str) return needed_objectsIndex , allowedList_string def get_infos(taskId, msgId,msg_h,key_str='waiting stream or video, send heartbeat'): outStrList={} outStrList['success']= '%s, taskId:%s msgId:%s send:%s'%(key_str,taskId, msgId,msg_h); outStrList['failure']='kafka ERROR, %s'%(key_str) outStrList['Refailure']='kafka Re-send ERROR ,%s'%(key_str) return outStrList def writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束'): #time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_AI.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName) EndUrl = EndUrl.replace(' ','-').replace(':','-') img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end) if imageTxtFile: EndUrl_txt = EndUrl.replace('.jpg','.txt') fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close() EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_OR.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName) EndUrl = EndUrl.replace(' ','-').replace(':','-') ret = cv2.imwrite(EndUrl,img_end) if imageTxtFile: EndUrl_txt = EndUrl.replace('.jpg','.txt') fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close() def get_current_time(): """[summary] 获取当前时间 [description] 用time.localtime()+time.strftime()实现 :returns: [description] 返回str类型 """ ct = time.time() local_time = time.localtime(ct) data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time) data_secs = (ct - int(ct)) * 1000 time_stamp = "%s.%03d" % (data_head, data_secs) return time_stamp def send_kafka(producer,par,msg,outStrList,fp_log,logger,line='000',thread='detector',printFlag=False ): future = producer.send(par['topic'], msg) try: record_metadata = future.get() outstr=outStrList['success'] #outstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(outstr,fp_log,level='INFO',thread=thread,line=line,logger=logger,printFlag=printFlag) except Exception as e: outstr='%s , warning: %s'%( outStrList['failure'],str(e)) writeELK_log(outstr,fp_log,level='WARNING',thread=thread,line=line,logger=logger,printFlag=printFlag) try: producer.close() producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get() future = producer.send(par['topic'], msg).get() except Exception as e: outstr='%s, error: %s'%( outStrList['Refailure'],str(e)) #outstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(outstr,fp_log,level='ERROR',thread=thread,line=line,logger=logger,printFlag=printFlag) def check_time_interval(time0_beg,time_interval): time_2 = time.time() if time_2 - time0_beg>time_interval: return time_2,True else: return time0_beg,False def addTime(strs): timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) outstr='\n %s %s '%(timestr,strs) return def get_file(): print("文件名 :",__file__,sys._getframe().f_lineno) print("函数名: ", sys._getframe().f_code.co_name) print("模块名: ", sys._getframe().f_back.f_code.co_name) def writeELK_log(msg,fp,level='INFO',thread='detector',logger='kafka_yolov5',line=9999,newLine=False,printFlag=True): #timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) timestr=get_current_time() outstr='%s [%s][%s][%d][%s]- %s'%(timestr,level,thread,line,logger,msg) if newLine: outstr = '\n'+outstr fp.write(outstr+'\n') fp.flush() if printFlag: print(outstr) return outstr def wrtiteLog(fp,strs,newLine=False): timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) if newLine: outstr='\n %s %s '%(timestr,strs) else: outstr='%s %s '%(timestr,strs) fp.write(outstr+'\n') fp.flush() return outstr def create_logFile(logdir='logdir',name=None): if name: logname =logdir+'/'+ name else: logname =logdir+'/'+ time.strftime("%Y-%m-%d.txt", time.localtime()) if os.path.exists(logname): fp_log = open(logname,'a+') else: fp_log = open(logname,'w') return fp_log def get_boradcast_address(outResource): #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945 #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935 if '1945' in outResource: return 'rtmp://live.play.t-aaron.com/live/THSB' else: return 'rtmp://demoplay.yunhengzhizao.cn/live/THSB_HD5M' def save_message(kafka_dir,msg): outtxt=os.path.join(kafka_dir,msg['request_id']+'.json') assert os.path.exists(kafka_dir) with open(outtxt,'w') as fp: json.dump(msg,fp,ensure_ascii=False) def get_push_address(outResource): #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945 #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935 #终端推流地址:rtmp://live.push.t-aaron.com/live/THSAa #终端拉流地址:rtmp://live.play.t-aaron.com/live/THSAa_hd #AI推流地址:rtmp://live.push.t-aaron.com/live/THSBa #AI拉流地址:rtmp://live.play.t-aaron.com/live/THSBa_hd if 't-aaron' in outResource: if 'THSBa' in outResource: port=1975 elif 'THSBb' in outResource: port=1991 elif 'THSBc' in outResource: port=1992 elif 'THSBd' in outResource: port=1993 elif 'THSBe' in outResource: port=1994 elif 'THSBf' in outResource: port=1995 elif 'THSBg' in outResource: port=1996 elif 'THSBh' in outResource: port=1997 else: port=1945 else: port=1935 return 'rtmp://127.0.0.1:%d/live/test'%(port) return outResource def getAllRecord_poll(consumer): msgs = consumer.poll(5000) keys=msgs.keys() out = [ msgs[x] for x in keys] out = [y for x in out for y in x] for key in keys: out.extend(msgs[key]) return out def getAllRecords(consumer,topics): leftCnt = 0 for topic in topics[0:2]: leftCnt+=get_left_cnt(consumer,topic) out = [] if leftCnt == 0: return [] for ii,msg in enumerate(consumer): consumer.commit() out.append(msg) if ii== (leftCnt-1): break###断流或者到终点 return out def get_left_cnt(consumer,topic): partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum return left_sum def view_bar(num, total,time1,prefix='prefix'): rate = num / total time_n=time.time() rate_num = int(rate * 30) rate_nums = np.round(rate * 100) r = '\r %s %d / %d [%s%s] %.2f s'%(prefix,num,total, ">" * rate_num, " " * (30 - rate_num), time_n-time1 ) sys.stdout.write(r) sys.stdout.flush() def get_total_cnt(inSource): cap=cv2.VideoCapture(inSource) assert cap.isOpened() cnt=cap.get(7) fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return cnt,fps def check_stream(inSource,producer,par,msg,outStrList ,fp_log,logger,line='000',thread='detector',timeMs=120,): cnt =(timeMs-1)//10 + 1 Stream_ok=False for icap in range(cnt): cap=cv2.VideoCapture(inSource) if cap.isOpened() and get_fps_rtmp(inSource,video=False)[0] : Stream_ok=True ;cap.release();break; #Stream_ok,_= get_fps_rtmp(inSource,video=False) #if Stream_ok:cap.release();break; else: Stream_ok=False timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime()) outstr='Waiting stream %d s'%(10*icap) writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=line,logger=logger) time.sleep(10) if icap%3==0: send_kafka(producer,par,msg,outStrList,fp_log,logger=logger,line=line,thread=thread ) return Stream_ok def get_fps_rtmp(inSource,video=False): cap=cv2.VideoCapture(inSource) if not cap.isOpened(): print('#####error url:',inSource) return False,[0,0,0,0] fps = cap.get(cv2.CAP_PROP_FPS) width = cap.get(cv2.CAP_PROP_FRAME_WIDTH ) height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) cnt = 0 if video: cnt=cap.get(7) if width*height==0 or fps>30: return False,[0,0,0,0] cap.release() try: outx = [fps,width,height,cnt] outx = [int(x+0.5) for x in outx] return True,outx except: return False, [0,0,0,0]