|
|
- from kafka import KafkaProducer, KafkaConsumer,TopicPartition
- from kafka.errors import kafka_errors
- import os,cv2,sys,json,time
- import numpy as np
- import requests
- def query_channel_status(channelIndex):
- channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
- #https://streaming.t-aaron.com/livechannel/getLiveStatus/LC001
- try:
- res = requests.get(channel_query_api,timeout=10).json()
- if res['data']['status']==2:#1空闲中 2使用中 3停用 4待关闭
- taskEnd=False
- else:
- taskEnd=True
- infos='channel_query_api connected'
- except Exception as e:
- taskEnd=True
- infos='channel_query_api not connected:%s'%(e)
- return infos, taskEnd
-
- def query_request_status(request_url):
- #channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
- channel_request_api=request_url
-
- try:
- res = requests.get(channel_request_api,timeout=10).json()
- if res['data']['status']==5:#5:执行中 10:待停止分析 15:执行结束
- taskEnd=False
- else:
- taskEnd=True
- infos='channel_request_api connected'
- except Exception as e:
- taskEnd=True
- infos='channel_request_api not connected:%s'%(e)
- return infos, taskEnd
-
- def get_needed_objectsIndex(object_config):
- needed_objectsIndex=[]
-
- for model in object_config:
- try:
- needed_objectsIndex.append(int(model['id']))
- except Exception as e:
- a=1
- allowedList_str=[str(x) for x in needed_objectsIndex]
- allowedList_string=','.join(allowedList_str)
-
- return needed_objectsIndex , allowedList_string
-
-
- def get_infos(taskId, msgId,msg_h,key_str='waiting stream or video, send heartbeat'):
- outStrList={}
- outStrList['success']= '%s, taskId:%s msgId:%s send:%s'%(key_str,taskId, msgId,msg_h);
- outStrList['failure']='kafka ERROR, %s'%(key_str)
- outStrList['Refailure']='kafka Re-send ERROR ,%s'%(key_str)
- return outStrList
- def writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束'):
- #time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_AI.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
- EndUrl = EndUrl.replace(' ','-').replace(':','-')
- img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
- if imageTxtFile:
- EndUrl_txt = EndUrl.replace('.jpg','.txt')
- fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
-
- EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_OR.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
- EndUrl = EndUrl.replace(' ','-').replace(':','-')
- ret = cv2.imwrite(EndUrl,img_end)
- if imageTxtFile:
- EndUrl_txt = EndUrl.replace('.jpg','.txt')
- fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
- def get_current_time():
- """[summary] 获取当前时间
-
- [description] 用time.localtime()+time.strftime()实现
- :returns: [description] 返回str类型
- """
- ct = time.time()
- local_time = time.localtime(ct)
- data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
- data_secs = (ct - int(ct)) * 1000
- time_stamp = "%s.%03d" % (data_head, data_secs)
- return time_stamp
-
-
-
- def send_kafka(producer,par,msg,outStrList,fp_log,logger,line='000',thread='detector',printFlag=False ):
- future = producer.send(par['topic'], msg)
- try:
- record_metadata = future.get()
- outstr=outStrList['success']
-
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(outstr,fp_log,level='INFO',thread=thread,line=line,logger=logger,printFlag=printFlag)
-
- except Exception as e:
- outstr='%s , warning: %s'%( outStrList['failure'],str(e))
- writeELK_log(outstr,fp_log,level='WARNING',thread=thread,line=line,logger=logger,printFlag=printFlag)
- try:
- producer.close()
- producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
- future = producer.send(par['topic'], msg).get()
- except Exception as e:
- outstr='%s, error: %s'%( outStrList['Refailure'],str(e))
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(outstr,fp_log,level='ERROR',thread=thread,line=line,logger=logger,printFlag=printFlag)
-
- def check_time_interval(time0_beg,time_interval):
- time_2 = time.time()
- if time_2 - time0_beg>time_interval:
- return time_2,True
- else:
- return time0_beg,False
- def addTime(strs):
- timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
-
- outstr='\n %s %s '%(timestr,strs)
- return
-
-
- def get_file():
- print("文件名 :",__file__,sys._getframe().f_lineno)
- print("函数名: ", sys._getframe().f_code.co_name)
- print("模块名: ", sys._getframe().f_back.f_code.co_name)
-
- def writeELK_log(msg,fp,level='INFO',thread='detector',logger='kafka_yolov5',line=9999,newLine=False,printFlag=True):
- #timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
- timestr=get_current_time()
- outstr='%s [%s][%s][%d][%s]- %s'%(timestr,level,thread,line,logger,msg)
-
- if newLine:
- outstr = '\n'+outstr
-
- fp.write(outstr+'\n')
- fp.flush()
- if printFlag:
- print(outstr)
- return outstr
-
-
- def wrtiteLog(fp,strs,newLine=False):
- timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
- if newLine:
- outstr='\n %s %s '%(timestr,strs)
- else:
- outstr='%s %s '%(timestr,strs)
- fp.write(outstr+'\n')
- fp.flush()
- return outstr
-
- def create_logFile(logdir='logdir',name=None):
- if name:
- logname =logdir+'/'+ name
- else:
- logname =logdir+'/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
- if os.path.exists(logname):
- fp_log = open(logname,'a+')
- else:
- fp_log = open(logname,'w')
- return fp_log
- def get_boradcast_address(outResource):
- #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945
- #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935
- if '1945' in outResource:
- return 'rtmp://live.play.t-aaron.com/live/THSB'
- else:
- return 'rtmp://demoplay.yunhengzhizao.cn/live/THSB_HD5M'
- def save_message(kafka_dir,msg):
- outtxt=os.path.join(kafka_dir,msg['request_id']+'.json')
- assert os.path.exists(kafka_dir)
- with open(outtxt,'w') as fp:
- json.dump(msg,fp,ensure_ascii=False)
-
-
-
- def get_push_address(outResource):
- #rtmp://live.push.t-aaron.com/live/THSB,阿里云,1945
- #rtmp://demopush.yunhengzhizao.cn/live/THSB,腾讯云,1935
- #终端推流地址:rtmp://live.push.t-aaron.com/live/THSAa
- #终端拉流地址:rtmp://live.play.t-aaron.com/live/THSAa_hd
- #AI推流地址:rtmp://live.push.t-aaron.com/live/THSBa
- #AI拉流地址:rtmp://live.play.t-aaron.com/live/THSBa_hd
-
- if 't-aaron' in outResource:
- if 'THSBa' in outResource: port=1975
- elif 'THSBb' in outResource: port=1991
- elif 'THSBc' in outResource: port=1992
- elif 'THSBd' in outResource: port=1993
- elif 'THSBe' in outResource: port=1994
- elif 'THSBf' in outResource: port=1995
- elif 'THSBg' in outResource: port=1996
- elif 'THSBh' in outResource: port=1997
- else: port=1945
- else: port=1935
- return 'rtmp://127.0.0.1:%d/live/test'%(port)
- return outResource
- def getAllRecord_poll(consumer):
- msgs = consumer.poll(5000)
- keys=msgs.keys()
- out = [ msgs[x] for x in keys]
- out = [y for x in out for y in x]
-
-
- for key in keys:
- out.extend(msgs[key])
- return out
- def getAllRecords(consumer,topics):
- leftCnt = 0
- for topic in topics[0:2]:
- leftCnt+=get_left_cnt(consumer,topic)
- out = []
- if leftCnt == 0:
- return []
- for ii,msg in enumerate(consumer):
- consumer.commit()
- out.append(msg)
- if ii== (leftCnt-1):
- break###断流或者到终点
- return out
-
- def get_left_cnt(consumer,topic):
- partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
-
- # total
- toff = consumer.end_offsets(partitions)
- toff = [(key.partition, toff[key]) for key in toff.keys()]
- toff.sort()
-
- # current
- coff = [(x.partition, consumer.committed(x)) for x in partitions]
- coff.sort()
-
- # cal sum and left
- toff_sum = sum([x[1] for x in toff])
- cur_sum = sum([x[1] for x in coff if x[1] is not None])
- left_sum = toff_sum - cur_sum
-
- return left_sum
- def view_bar(num, total,time1,prefix='prefix'):
- rate = num / total
- time_n=time.time()
- rate_num = int(rate * 30)
- rate_nums = np.round(rate * 100)
- r = '\r %s %d / %d [%s%s] %.2f s'%(prefix,num,total, ">" * rate_num, " " * (30 - rate_num), time_n-time1 )
- sys.stdout.write(r)
- sys.stdout.flush()
- def get_total_cnt(inSource):
- cap=cv2.VideoCapture(inSource)
- assert cap.isOpened()
- cnt=cap.get(7)
- fps = cap.get(cv2.CAP_PROP_FPS)
- cap.release()
- return cnt,fps
- def check_stream(inSource,producer,par,msg,outStrList ,fp_log,logger,line='000',thread='detector',timeMs=120,):
- cnt =(timeMs-1)//10 + 1
- Stream_ok=False
-
- for icap in range(cnt):
- cap=cv2.VideoCapture(inSource)
-
- if cap.isOpened() and get_fps_rtmp(inSource,video=False)[0] :
- Stream_ok=True ;cap.release();break;
- #Stream_ok,_= get_fps_rtmp(inSource,video=False)
- #if Stream_ok:cap.release();break;
- else:
- Stream_ok=False
- timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
- outstr='Waiting stream %d s'%(10*icap)
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=line,logger=logger)
- time.sleep(10)
- if icap%3==0:
- send_kafka(producer,par,msg,outStrList,fp_log,logger=logger,line=line,thread=thread )
-
-
- return Stream_ok
-
-
-
-
- def get_fps_rtmp(inSource,video=False):
- cap=cv2.VideoCapture(inSource)
- if not cap.isOpened():
- print('#####error url:',inSource)
- return False,[0,0,0,0]
-
- fps = cap.get(cv2.CAP_PROP_FPS)
- width = cap.get(cv2.CAP_PROP_FRAME_WIDTH )
- height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
- cnt = 0
- if video: cnt=cap.get(7)
-
- if width*height==0 or fps>30:
- return False,[0,0,0,0]
- cap.release()
- try:
- outx = [fps,width,height,cnt]
- outx = [int(x+0.5) for x in outx]
-
- return True,outx
- except:
- return False, [0,0,0,0]
-
-
|