from aliyunsdkvod.request.v20170321 import GetPlayInfoRequest import json import traceback from aliyunsdkcore.client import AcsClient from PIL import Image import numpy as np import cv2 import base64 import io,os,copy import requests import time,json import string,random import glob,string,sys from multiprocessing import Process,Queue import oss2 from kafka import KafkaProducer, KafkaConsumer from voduploadsdk.UploadVideoRequest import UploadVideoRequest from voduploadsdk.AliyunVodUtils import * from voduploadsdk.AliyunVodUploader import AliyunVodUploader from datetime import datetime, date, timedelta def get_today(): return date.today().strftime("%Y-%m-%d") def get_yesterday(beforeday=-1): return (date.today() + timedelta(days =beforeday)).strftime("%Y-%m-%d") def get_videoUurl(videoBakDir,filename): ###七天时间内 potentialUrls=[ os.path.join( videoBakDir,get_yesterday(beforeday=-x),filename) for x in range(7) ] existsList=[os.path.exists(x ) for x in potentialUrls] for i,flag in enumerate(existsList): if flag: return potentialUrls[i] return potentialUrls[0] def getNamedic(jsonfile): with open(jsonfile) as fp: dataDic=json.load(fp) #"labelnames":["排口","排污口","水生植被","漂浮物","其它"], #"labelIndexs":["SL014","SL011","SL013","SL001","SL001" ] assert 'labelnames' in dataDic.keys() , 'labelnames is not the key in %s'%(jsonfile) assert 'labelIndexs' in dataDic.keys() , 'labelIndexs is not the key in %s'%(jsonfile) assert len(dataDic['labelnames'])==len(dataDic['labelIndexs']) nameDic={} for key,value in zip(dataDic['labelnames'],dataDic['labelIndexs']): nameDic[key]=value return nameDic def get_play_info(clt, videoId): request = GetPlayInfoRequest.GetPlayInfoRequest() request.set_accept_format('JSON') request.set_VideoId(videoId) request.set_AuthTimeout(3600*5) response = json.loads(clt.do_action_with_exception(request)) return response def create_status_msg(msg_dict_off,taskInfos,sts='waiting'): msg= copy.deepcopy(msg_dict_off) msg=update_json(taskInfos,msg,offkeys=["request_id"] ) msg['status']=sts msg = json.dumps(msg, ensure_ascii=False) return msg # 填入AccessKey信息 def init_vod_client(accessKeyId, accessKeySecret): regionId = 'cn-shanghai' # 点播服务接入地域 connectTimeout = 3 # 连接超时,单位为秒 return AcsClient(accessKeyId, accessKeySecret, regionId, auto_retry=True, max_retry_time=3, timeout=connectTimeout) def update_json(jsonOri,jsonNew,offkeys=["request_id" ]): #{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'request_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'} for key in offkeys: jsonNew[key] = jsonOri[key] return jsonNew def get_time(filename): #2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg sps=filename.strip().split('_')[0] tsps=sps.split('-') return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5]) def get_ms(time0,time1): str_time ='%.2f ms'%((time1-time0)*1000) return str_time def get_urls( platform_query_url,fp_log ): try: if os.path.exists(platform_query_url): #print('###line49') with open('SendLog/platformQuery.json','r') as fp: res = json.load(fp) else: res = requests.get(platform_query_url,timeout=10).json() #print('###line54') questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址 offlineUrl = res['data']['offlineUrl'] ###http离线视频时,问题图片的推送地址 except Exception as ee: timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url)) outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url) fp_log.write(outstr);fp_log.flush() questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion" offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion" return questionUrl,offlineUrl def parse_filename(filename_base): #etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg uid =filename_base.split('.')[0].split('_')[3].strip() sourceType=filename_base.split('_')[4].split('-')[1] sourceId=filename_base.split('_')[4].split('-')[2] typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip() return uid,sourceType,sourceId,typename def b64encode_function(filename, filename_OR): if os.path.exists(filename): image_ori=cv2.imread(filename) image_ori_OR=cv2.imread(filename_OR) else: image_ori = filename.copy() image_ori_OR = image_ori_OR.copy() image_pngcode = cv2.imencode('.jpg',image_ori)[-1] image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1] image_code = str(base64.b64encode(image_pngcode))[2:-1] image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1] return image_code, image_code_OR def JsonSend(parIn): fp_log = parIn['fp_log'] try: response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json() t3 = time.time() print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage'])) outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str']) fp_log.write(outstr);fp_log.flush() except Exception as ee: print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage'])) outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage']) fp_log.write(outstr);fp_log.flush() def dic2str(dic): st='' for key in dic.keys(): st='%s %s:%s,'%(st,key,dic[key]) return st def createJsonInput(filename,offlineUrl,questionUrl): flag = True filename_base = os.path.basename(filename) filename_OR=filename.replace('_AI.','_OR.') if not os.path.exists(filename_OR ): return False uid,sourceType, sourceId,typename = parse_filename(filename_base) if (typename not in name_dic.keys()) or (typename == '排口'): return False api = questionUrl if sourceType=='live' else offlineUrl time_str = get_time(filename_base) input_ ={ 'imgName':os.path.basename(filename), 'imgNameOriginal':os.path.basename(filename_OR), 'time':time_str, 'fid':uid, ###随机16位字符 'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"] 'typeId':nameID_dic[typename] } if sourceType!='live': input_['code']=sourceId;###只有离线视频才需要code, dic_str = dic2str(input_) t1 = time.time() image_code, image_code_OR = b64encode_function(filename, filename_OR) input_['imgData']=image_code input_['imgDataOriginal']=image_code_OR sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0 parOut={};parOut['flag']=True;parOut['input_']=input_; parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str; parOut['filename']=filename;parOut['filename_OR']=filename_OR; parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base return parOut def getLogFileFp(streamName): logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName) if os.path.exists(logname): fp_log = open(logname,'a+') else: fp_log = open(logname,'w') return def lodaMsgInfos(jsonDir,msgId): jsonUrl = os.path.join(jsonDir,msgId+'.json') with open(jsonUrl,'r') as fp: data=json.load(fp) return data def parse_filename_for_oss(name): splts=name.split('_') typename=splts[2].split('-')[1].strip() msgId=splts[4].split('-')[3] onLineType=splts[4].split('-')[1] return typename,msgId,onLineType def percentage(consumed_bytes, total_bytes): if total_bytes: rate = int(100 * (float(consumed_bytes) / float(total_bytes))) print('\r{0}% '.format(rate), end='') sys.stdout.flush()