|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- from PIL import Image
- import numpy as np
- import cv2
- import base64
- import io,os
- import requests
- import time,json
- import string,random
- import glob,string,sys
- from multiprocessing import Process,Queue
- import oss2
- from kafka import KafkaProducer, KafkaConsumer
- ##for CeKanYuan
- #10月21日,通过图像名称判断,是那个平台。方式不好。
- #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
- #3月18日,采用OSS阿里云存储桶
- #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
- platform_query_url='SendLog/platformQuery.json'
- api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
- #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
-
- ##这套名字,是联通的。
- name_dic={
- "排口":"入河、湖排口",
- "疑似污口": "入河、湖排口",
- "水生植被": "水生植物",
- "漂浮物": "有大面积漂物",
- "结束": "结束",
- '其它' :'其它'
- }
- ## for TH river
- ##这套代码是河长制度的。
- nameID_dic={
- "排口":'00000',
- "疑似污口": '8378',
- "水生植被": '8380',
- "漂浮物": '8368',
- "结束":'9999',
- '其它':'8888'
- }
-
- def get_time(filename):
- #2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg
- sps=filename.strip().split('_')[0]
- tsps=sps.split('-')
- return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5])
- def get_ms(time0,time1):
- str_time ='%.2f ms'%((time1-time0)*1000)
- return str_time
-
- def get_urls( platform_query_url,fp_log ):
- try:
- if os.path.exists(platform_query_url):
- #print('###line49')
- with open('SendLog/platformQuery.json','r') as fp:
- res = json.load(fp)
- else:
- res = requests.get(platform_query_url,timeout=10).json()
- #print('###line54')
- questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址
- offlineUrl = res['data']['offlineUrl'] ###http离线视频时,问题图片的推送地址
- except Exception as ee:
- timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url))
- outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url)
- fp_log.write(outstr);fp_log.flush()
- questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
- offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
- return questionUrl,offlineUrl
- def parse_filename(filename_base):
- #etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg
- uid =filename_base.split('.')[0].split('_')[3].strip()
- sourceType=filename_base.split('_')[4].split('-')[1]
- sourceId=filename_base.split('_')[4].split('-')[2]
- typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip()
- return uid,sourceType,sourceId,typename
- def b64encode_function(filename, filename_OR):
- if os.path.exists(filename):
- image_ori=cv2.imread(filename)
- image_ori_OR=cv2.imread(filename_OR)
- else:
- image_ori = filename.copy()
- image_ori_OR = image_ori_OR.copy()
- image_pngcode = cv2.imencode('.jpg',image_ori)[-1]
- image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1]
- image_code = str(base64.b64encode(image_pngcode))[2:-1]
- image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1]
- return image_code, image_code_OR
- def JsonSend(parIn):
-
- fp_log = parIn['fp_log']
- try:
- response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json()
- t3 = time.time()
- print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage']))
- outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str'])
- fp_log.write(outstr);fp_log.flush()
-
- except Exception as ee:
- print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage']))
- outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage'])
- fp_log.write(outstr);fp_log.flush()
-
-
- def dic2str(dic):
- st=''
- for key in dic.keys():
- st='%s %s:%s,'%(st,key,dic[key])
- return st
- def createJsonInput(filename,offlineUrl,questionUrl):
- flag = True
- filename_base = os.path.basename(filename)
- filename_OR=filename.replace('_AI.','_OR.')
- if not os.path.exists(filename_OR ):
- return False
-
- uid,sourceType, sourceId,typename = parse_filename(filename_base)
- if (typename not in name_dic.keys()) or (typename == '排口'):
- return False
- api = questionUrl if sourceType=='live' else offlineUrl
-
- time_str = get_time(filename_base)
- input_ ={
- 'imgName':os.path.basename(filename),
- 'imgNameOriginal':os.path.basename(filename_OR),
- 'time':time_str,
- 'fid':uid, ###随机16位字符
- 'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"]
- 'typeId':nameID_dic[typename]
-
- }
- if sourceType!='live':
- input_['code']=sourceId;###只有离线视频才需要code,
-
- dic_str = dic2str(input_)
- t1 = time.time()
-
- image_code, image_code_OR = b64encode_function(filename, filename_OR)
- input_['imgData']=image_code
- input_['imgDataOriginal']=image_code_OR
-
- sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0
-
- parOut={};parOut['flag']=True;parOut['input_']=input_;
- parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str;
- parOut['filename']=filename;parOut['filename_OR']=filename_OR;
- parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base
- return parOut
-
- def getLogFileFp(streamName):
- logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName)
- if os.path.exists(logname):
- fp_log = open(logname,'a+')
- else:
- fp_log = open(logname,'w')
- return
-
- def lodaMsgInfos(jsonDir,msgId):
- jsonUrl = os.path.join(jsonDir,msgId+'.json')
- with open(jsonUrl,'r') as fp:
- data=json.load(fp)
- return data
-
- def parse_filename_for_oss(name):
- splts=name.split('_')
- typename=splts[2].split('-')[1].strip()
- msgId=splts[4].split('-')[3]
- onLineType=splts[4].split('-')[1]
- return typename,msgId,onLineType
-
- msg_dict_off={
- "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "biz_id":"hehuzhang",#业务标识
- "mod_id":"ai",#模型标识
- "status":"running",#任务状态
- "type":str(1),#数据类型:1图片 2视频
- "error":9999,#错误信息
- "results":[#问题结果
- {
- "original_url":"",#原图地址
- "sign_url":"",#AI标记地址
- "category_id":"",#分类标识
- "description":"",#问题描述
- "time":"",#时间戳
- }
- ]
- }
-
- msg_dict_on={
- "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "biz_id":"hehuzhang",#业务标识
- "mod_id":"qi",#模型标识
- "status":"running",#任务状态
- "type":str(2),#数据类型:1图片 2视频
- "error":9999,#错误信息
- "results":[#问题结果
- {
- "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
- "sign_url":"",#识别后视频地址
- }
- ]
- }
-
- def update_json(jsonOri,jsonNew,offkeys=["msg_id","biz_id" ,"mod_id" ]):
- #{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'msg_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'}
- for key in offkeys:
- jsonNew[key] = jsonOri[key]
- return jsonNew
- def test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar):
-
-
- time0_0 = time.time()
- logname ='SendLog/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
- if os.path.exists(logname):
- fp_log = open(logname,'a+')
- else:
- fp_log = open(logname,'w')
- ifind=0
- time0_0 = time.time()
-
- producer = KafkaProducer(
- bootstrap_servers=kafkaPar['boostServer'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8'))
-
-
- ###登陆准备存储桶
- auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
- # Endpoint以杭州为例,其它Region请按实际情况填写。
- bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
-
-
- while True:
- #filelist = os.listdir(indir)
- filelist_AI = sorted(glob.glob('%s/*_AI.*'%(indir)),key=os.path.getmtime)
- filelist = filelist_AI
-
- if len(filelist)!=0:
- time0 = time.time()
- for filename in filelist[0:2]:
- filename_base = os.path.basename(filename)
- ##解析文件名
- typename,msgId,onLineType = parse_filename_for_oss(filename_base)
-
- ##存储文件
- filename_OR=filename.replace('_AI.','_OR.')
-
-
-
- if typename!='结束':
- ObjectName_AI=os.path.join(ossPar['bucketName'],os.path.basename(filename))
- ObjectName_OR=os.path.join(ossPar['bucketName'],os.path.basename(filename_OR))
- bucket.put_object_from_file(ObjectName_AI, filename)
- bucket.put_object_from_file(ObjectName_OR, filename_OR)
- taskInfos = lodaMsgInfos(jsonDir,msgId)
- #print(taskInfos)
- ##发送返回信息
- #if onLineType=='off':
- msg = msg_dict_off
- msg['results'][0]['original_url']= ObjectName_OR
- msg['results'][0]['sign_url']= ObjectName_AI
- msg['results'][0]['category_id']= nameID_dic[typename]
- msg['results'][0]['description']= typename
- msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- msg = update_json(taskInfos,msg)
- else:
- msg = msg_dict_on
- videoList = sorted(glob.glob('%s/*'%(videoBakDir)),key=os.path.getmtime)
- videoName = os.path.basename(videoList[0])
- msg["status"]="success";msg["msg_id"]=msgId
- ObjectName_AI=os.path.join(ossPar['bucketName'],videoName)
- bucket.put_object_from_file(ObjectName_AI, videoList[0])
- msg['results'][0]['original_url']= ObjectName_AI
- msg['results'][0]['sign_url']= ObjectName_AI###最新的视频文件
-
- print('###'*3,'Send:',filename)
- msg = json.dumps(msg, ensure_ascii=False)
- future = producer.send(
- kafkaPar['topic'],
- msg
- )
- print('***'*20,' Send transfer ',onLineType,msg)
-
- ##上传后的图片,移走到另外一个文件夹###
- cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
- cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
-
- else:
- time.sleep(1)
- fp_log.close()
-
- if __name__=='__main__':
- indir='problems/images_tmp'
- outdir='problems/images_save'
- jsonDir = 'mintors/kafka/'
- videoBakDir='../../data/video_live_bak/1945'
- ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
- 'AId':'LTAI5tSJ62TLMUb4SZuf285A',
- 'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
- 'bucketName':'ta-tech-image',
- }
- #kafkaPar={'boostServer':['212.129.223.66:9092'],'topic':'testReturn'}
- kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
- test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar)
|