from PIL import Image import numpy as np import cv2 import base64 import io,os import requests import time,json import string,random import glob,string,sys from multiprocessing import Process,Queue import oss2 from kafka import KafkaProducer, KafkaConsumer ##for CeKanYuan #10月21日,通过图像名称判断,是那个平台。方式不好。 #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。 #3月18日,采用OSS阿里云存储桶 #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo' platform_query_url='SendLog/platformQuery.json' api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion' #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion' ##这套名字,是联通的。 name_dic={ "排口":"入河、湖排口", "疑似污口": "入河、湖排口", "水生植被": "水生植物", "漂浮物": "有大面积漂物", "结束": "结束", '其它' :'其它' } ## for TH river ##这套代码是河长制度的。 nameID_dic={ "排口":'00000', "疑似污口": '8378', "水生植被": '8380', "漂浮物": '8368', "结束":'9999', '其它':'8888' } def get_time(filename): #2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg sps=filename.strip().split('_')[0] tsps=sps.split('-') return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5]) def get_ms(time0,time1): str_time ='%.2f ms'%((time1-time0)*1000) return str_time def get_urls( platform_query_url,fp_log ): try: if os.path.exists(platform_query_url): #print('###line49') with open('SendLog/platformQuery.json','r') as fp: res = json.load(fp) else: res = requests.get(platform_query_url,timeout=10).json() #print('###line54') questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址 offlineUrl = res['data']['offlineUrl'] ###http离线视频时,问题图片的推送地址 except Exception as ee: timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url)) outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url) fp_log.write(outstr);fp_log.flush() questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion" offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion" return questionUrl,offlineUrl def parse_filename(filename_base): #etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg uid =filename_base.split('.')[0].split('_')[3].strip() sourceType=filename_base.split('_')[4].split('-')[1] sourceId=filename_base.split('_')[4].split('-')[2] typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip() return uid,sourceType,sourceId,typename def b64encode_function(filename, filename_OR): if os.path.exists(filename): image_ori=cv2.imread(filename) image_ori_OR=cv2.imread(filename_OR) else: image_ori = filename.copy() image_ori_OR = image_ori_OR.copy() image_pngcode = cv2.imencode('.jpg',image_ori)[-1] image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1] image_code = str(base64.b64encode(image_pngcode))[2:-1] image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1] return image_code, image_code_OR def JsonSend(parIn): fp_log = parIn['fp_log'] try: response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json() t3 = time.time() print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage'])) outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str']) fp_log.write(outstr);fp_log.flush() except Exception as ee: print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage'])) outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage']) fp_log.write(outstr);fp_log.flush() def dic2str(dic): st='' for key in dic.keys(): st='%s %s:%s,'%(st,key,dic[key]) return st def createJsonInput(filename,offlineUrl,questionUrl): flag = True filename_base = os.path.basename(filename) filename_OR=filename.replace('_AI.','_OR.') if not os.path.exists(filename_OR ): return False uid,sourceType, sourceId,typename = parse_filename(filename_base) if (typename not in name_dic.keys()) or (typename == '排口'): return False api = questionUrl if sourceType=='live' else offlineUrl time_str = get_time(filename_base) input_ ={ 'imgName':os.path.basename(filename), 'imgNameOriginal':os.path.basename(filename_OR), 'time':time_str, 'fid':uid, ###随机16位字符 'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"] 'typeId':nameID_dic[typename] } if sourceType!='live': input_['code']=sourceId;###只有离线视频才需要code, dic_str = dic2str(input_) t1 = time.time() image_code, image_code_OR = b64encode_function(filename, filename_OR) input_['imgData']=image_code input_['imgDataOriginal']=image_code_OR sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0 parOut={};parOut['flag']=True;parOut['input_']=input_; parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str; parOut['filename']=filename;parOut['filename_OR']=filename_OR; parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base return parOut def getLogFileFp(streamName): logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName) if os.path.exists(logname): fp_log = open(logname,'a+') else: fp_log = open(logname,'w') return def lodaMsgInfos(jsonDir,msgId): jsonUrl = os.path.join(jsonDir,msgId+'.json') with open(jsonUrl,'r') as fp: data=json.load(fp) return data def parse_filename_for_oss(name): splts=name.split('_') typename=splts[2].split('-')[1].strip() msgId=splts[4].split('-')[3] onLineType=splts[4].split('-')[1] return typename,msgId,onLineType msg_dict_off={ "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识 "biz_id":"hehuzhang",#业务标识 "mod_id":"ai",#模型标识 "status":"running",#任务状态 "type":str(1),#数据类型:1图片 2视频 "error":9999,#错误信息 "results":[#问题结果 { "original_url":"",#原图地址 "sign_url":"",#AI标记地址 "category_id":"",#分类标识 "description":"",#问题描述 "time":"",#时间戳 } ] } msg_dict_on={ "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识 "biz_id":"hehuzhang",#业务标识 "mod_id":"qi",#模型标识 "status":"running",#任务状态 "type":str(2),#数据类型:1图片 2视频 "error":9999,#错误信息 "results":[#问题结果 { "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传) "sign_url":"",#识别后视频地址 } ] } def update_json(jsonOri,jsonNew,offkeys=["msg_id","biz_id" ,"mod_id" ]): #{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'msg_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'} for key in offkeys: jsonNew[key] = jsonOri[key] return jsonNew def test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar): time0_0 = time.time() logname ='SendLog/'+ time.strftime("%Y-%m-%d.txt", time.localtime()) if os.path.exists(logname): fp_log = open(logname,'a+') else: fp_log = open(logname,'w') ifind=0 time0_0 = time.time() producer = KafkaProducer( bootstrap_servers=kafkaPar['boostServer'],#tencent yun value_serializer=lambda v: v.encode('utf-8')) ###登陆准备存储桶 auth = oss2.Auth(ossPar['AId'], ossPar['ASt']) # Endpoint以杭州为例,其它Region请按实际情况填写。 bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName']) while True: #filelist = os.listdir(indir) filelist_AI = sorted(glob.glob('%s/*_AI.*'%(indir)),key=os.path.getmtime) filelist = filelist_AI if len(filelist)!=0: time0 = time.time() for filename in filelist[0:2]: filename_base = os.path.basename(filename) ##解析文件名 typename,msgId,onLineType = parse_filename_for_oss(filename_base) ##存储文件 filename_OR=filename.replace('_AI.','_OR.') if typename!='结束': ObjectName_AI=os.path.join(ossPar['bucketName'],os.path.basename(filename)) ObjectName_OR=os.path.join(ossPar['bucketName'],os.path.basename(filename_OR)) bucket.put_object_from_file(ObjectName_AI, filename) bucket.put_object_from_file(ObjectName_OR, filename_OR) taskInfos = lodaMsgInfos(jsonDir,msgId) #print(taskInfos) ##发送返回信息 #if onLineType=='off': msg = msg_dict_off msg['results'][0]['original_url']= ObjectName_OR msg['results'][0]['sign_url']= ObjectName_AI msg['results'][0]['category_id']= nameID_dic[typename] msg['results'][0]['description']= typename msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = update_json(taskInfos,msg) else: msg = msg_dict_on videoList = sorted(glob.glob('%s/*'%(videoBakDir)),key=os.path.getmtime) videoName = os.path.basename(videoList[0]) msg["status"]="success";msg["msg_id"]=msgId ObjectName_AI=os.path.join(ossPar['bucketName'],videoName) bucket.put_object_from_file(ObjectName_AI, videoList[0]) msg['results'][0]['original_url']= ObjectName_AI msg['results'][0]['sign_url']= ObjectName_AI###最新的视频文件 print('###'*3,'Send:',filename) msg = json.dumps(msg, ensure_ascii=False) future = producer.send( kafkaPar['topic'], msg ) print('***'*20,' Send transfer ',onLineType,msg) ##上传后的图片,移走到另外一个文件夹### cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd) cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd) else: time.sleep(1) fp_log.close() if __name__=='__main__': indir='problems/images_tmp' outdir='problems/images_save' jsonDir = 'mintors/kafka/' videoBakDir='../../data/video_live_bak/1945' ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com', 'AId':'LTAI5tSJ62TLMUb4SZuf285A', 'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7', 'bucketName':'ta-tech-image', } #kafkaPar={'boostServer':['212.129.223.66:9092'],'topic':'testReturn'} kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'} test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar)