AIlib2/utilsK/masterUtils.py

304 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from kafka import KafkaProducer, KafkaConsumer,TopicPartition
from kafka.errors import kafka_errors
import os,cv2,sys,json,time
import numpy as np
import requests
def query_channel_status(channelIndex):
channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
#https://streaming.t-aaron.com/livechannel/getLiveStatus/LC001
try:
res = requests.get(channel_query_api,timeout=10).json()
if res['data']['status']==2:#1空闲中 2使用中 3停用 4待关闭
taskEnd=False
else:
taskEnd=True
infos='channel_query_api connected'
except Exception as e:
taskEnd=True
infos='channel_query_api not connected:%s'%(e)
return infos, taskEnd
def query_request_status(request_url):
#channel_query_api='https://streaming.t-aaron.com/livechannel/getLiveStatus/%s'%(channelIndex)
channel_request_api=request_url
try:
res = requests.get(channel_request_api,timeout=10).json()
if res['data']['status']==5:#5执行中 10待停止分析 15执行结束
taskEnd=False
else:
taskEnd=True
infos='channel_request_api connected'
except Exception as e:
taskEnd=True
infos='channel_request_api not connected:%s'%(e)
return infos, taskEnd
def get_needed_objectsIndex(object_config):
needed_objectsIndex=[]
for model in object_config:
try:
needed_objectsIndex.append(int(model['id']))
except Exception as e:
a=1
allowedList_str=[str(x) for x in needed_objectsIndex]
allowedList_string=','.join(allowedList_str)
return needed_objectsIndex , allowedList_string
def get_infos(taskId, msgId,msg_h,key_str='waiting stream or video, send heartbeat'):
outStrList={}
outStrList['success']= '%s, taskId:%s msgId:%s send:%s'%(key_str,taskId, msgId,msg_h);
outStrList['failure']='kafka ERROR, %s'%(key_str)
outStrList['Refailure']='kafka Re-send ERROR ,%s'%(key_str)
return outStrList
def writeTxtEndFlag(outImaDir,streamName,imageTxtFile,endFlag='结束'):
#time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_AI.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
img_end=np.zeros((100,100),dtype=np.uint8);cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
EndUrl='%s/%s_frame-9999-9999_type-%s_9999999999999999_s-%s_OR.jpg'%(outImaDir,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),endFlag,streamName)
EndUrl = EndUrl.replace(' ','-').replace(':','-')
ret = cv2.imwrite(EndUrl,img_end)
if imageTxtFile:
EndUrl_txt = EndUrl.replace('.jpg','.txt')
fp_t=open(EndUrl_txt,'w');fp_t.write(EndUrl+'\n');fp_t.close()
def get_current_time():
"""[summary] 获取当前时间
[description] 用time.localtime()+time.strftime()实现
:returns: [description] 返回str类型
"""
ct = time.time()
local_time = time.localtime(ct)
data_head = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
data_secs = (ct - int(ct)) * 1000
time_stamp = "%s.%03d" % (data_head, data_secs)
return time_stamp
def send_kafka(producer,par,msg,outStrList,fp_log,logger,line='000',thread='detector',printFlag=False ):
future = producer.send(par['topic'], msg)
try:
record_metadata = future.get()
outstr=outStrList['success']
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(outstr,fp_log,level='INFO',thread=thread,line=line,logger=logger,printFlag=printFlag)
except Exception as e:
outstr='%s , warning: %s'%( outStrList['failure'],str(e))
writeELK_log(outstr,fp_log,level='WARNING',thread=thread,line=line,logger=logger,printFlag=printFlag)
try:
producer.close()
producer = KafkaProducer(bootstrap_servers=par['server'], value_serializer=lambda v: v.encode('utf-8')).get()
future = producer.send(par['topic'], msg).get()
except Exception as e:
outstr='%s, error: %s'%( outStrList['Refailure'],str(e))
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(outstr,fp_log,level='ERROR',thread=thread,line=line,logger=logger,printFlag=printFlag)
def check_time_interval(time0_beg,time_interval):
time_2 = time.time()
if time_2 - time0_beg>time_interval:
return time_2,True
else:
return time0_beg,False
def addTime(strs):
timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
outstr='\n %s %s '%(timestr,strs)
return
def get_file():
print("文件名 :",__file__,sys._getframe().f_lineno)
print("函数名: ", sys._getframe().f_code.co_name)
print("模块名: ", sys._getframe().f_back.f_code.co_name)
def writeELK_log(msg,fp,level='INFO',thread='detector',logger='kafka_yolov5',line=9999,newLine=False,printFlag=True):
#timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
timestr=get_current_time()
outstr='%s [%s][%s][%d][%s]- %s'%(timestr,level,thread,line,logger,msg)
if newLine:
outstr = '\n'+outstr
fp.write(outstr+'\n')
fp.flush()
if printFlag:
print(outstr)
return outstr
def wrtiteLog(fp,strs,newLine=False):
timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
if newLine:
outstr='\n %s %s '%(timestr,strs)
else:
outstr='%s %s '%(timestr,strs)
fp.write(outstr+'\n')
fp.flush()
return outstr
def create_logFile(logdir='logdir',name=None):
if name:
logname =logdir+'/'+ name
else:
logname =logdir+'/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
if os.path.exists(logname):
fp_log = open(logname,'a+')
else:
fp_log = open(logname,'w')
return fp_log
def get_boradcast_address(outResource):
#rtmp://live.push.t-aaron.com/live/THSB阿里云1945
#rtmp://demopush.yunhengzhizao.cn/live/THSB腾讯云1935
if '1945' in outResource:
return 'rtmp://live.play.t-aaron.com/live/THSB'
else:
return 'rtmp://demoplay.yunhengzhizao.cn/live/THSB_HD5M'
def save_message(kafka_dir,msg):
outtxt=os.path.join(kafka_dir,msg['request_id']+'.json')
assert os.path.exists(kafka_dir)
with open(outtxt,'w') as fp:
json.dump(msg,fp,ensure_ascii=False)
def get_push_address(outResource):
#rtmp://live.push.t-aaron.com/live/THSB阿里云1945
#rtmp://demopush.yunhengzhizao.cn/live/THSB腾讯云1935
#终端推流地址rtmp://live.push.t-aaron.com/live/THSAa
#终端拉流地址rtmp://live.play.t-aaron.com/live/THSAa_hd
#AI推流地址rtmp://live.push.t-aaron.com/live/THSBa
#AI拉流地址rtmp://live.play.t-aaron.com/live/THSBa_hd
if 't-aaron' in outResource:
if 'THSBa' in outResource: port=1975
elif 'THSBb' in outResource: port=1991
elif 'THSBc' in outResource: port=1992
elif 'THSBd' in outResource: port=1993
elif 'THSBe' in outResource: port=1994
elif 'THSBf' in outResource: port=1995
elif 'THSBg' in outResource: port=1996
elif 'THSBh' in outResource: port=1997
else: port=1945
else: port=1935
return 'rtmp://127.0.0.1:%d/live/test'%(port)
return outResource
def getAllRecord_poll(consumer):
msgs = consumer.poll(5000)
keys=msgs.keys()
out = [ msgs[x] for x in keys]
out = [y for x in out for y in x]
for key in keys:
out.extend(msgs[key])
return out
def getAllRecords(consumer,topics):
leftCnt = 0
for topic in topics[0:2]:
leftCnt+=get_left_cnt(consumer,topic)
out = []
if leftCnt == 0:
return []
for ii,msg in enumerate(consumer):
consumer.commit()
out.append(msg)
if ii== (leftCnt-1):
break###断流或者到终点
return out
def get_left_cnt(consumer,topic):
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
return left_sum
def view_bar(num, total,time1,prefix='prefix'):
rate = num / total
time_n=time.time()
rate_num = int(rate * 30)
rate_nums = np.round(rate * 100)
r = '\r %s %d / %d [%s%s] %.2f s'%(prefix,num,total, ">" * rate_num, " " * (30 - rate_num), time_n-time1 )
sys.stdout.write(r)
sys.stdout.flush()
def get_total_cnt(inSource):
cap=cv2.VideoCapture(inSource)
assert cap.isOpened()
cnt=cap.get(7)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return cnt,fps
def check_stream(inSource,producer,par,msg,outStrList ,fp_log,logger,line='000',thread='detector',timeMs=120,):
cnt =(timeMs-1)//10 + 1
Stream_ok=False
for icap in range(cnt):
cap=cv2.VideoCapture(inSource)
if cap.isOpened() and get_fps_rtmp(inSource,video=False)[0] :
Stream_ok=True ;cap.release();break;
#Stream_ok,_= get_fps_rtmp(inSource,video=False)
#if Stream_ok:cap.release();break;
else:
Stream_ok=False
timestr=time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
outstr='Waiting stream %d s'%(10*icap)
writeELK_log(msg=outstr,fp=fp_log,thread=thread,line=line,logger=logger)
time.sleep(10)
if icap%3==0:
send_kafka(producer,par,msg,outStrList,fp_log,logger=logger,line=line,thread=thread )
return Stream_ok
def get_fps_rtmp(inSource,video=False):
cap=cv2.VideoCapture(inSource)
if not cap.isOpened():
print('#####error url:',inSource)
return False,[0,0,0,0]
fps = cap.get(cv2.CAP_PROP_FPS)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH )
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
cnt = 0
if video: cnt=cap.get(7)
if width*height==0 or fps>30:
return False,[0,0,0,0]
cap.release()
try:
outx = [fps,width,height,cnt]
outx = [int(x+0.5) for x in outx]
return True,outx
except:
return False, [0,0,0,0]