from PIL import Image import numpy as np import cv2 import base64 import io,os import requests import time,json import string,random import glob,string,sys from multiprocessing import Process,Queue import oss2,copy from kafka import KafkaProducer, KafkaConsumer from utilsK.sendUtils import * from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka from voduploadsdk.UploadVideoRequest import UploadVideoRequest from voduploadsdk.AliyunVodUtils import * from voduploadsdk.AliyunVodUploader import AliyunVodUploader import hashlib from kafka.errors import kafka_errors ##for CeKanYuan #10月21日,通过图像名称判断,是那个平台。方式不好。 #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。 #3月18日,采用OSS阿里云存储桶 #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo' platform_query_url='SendLog/platformQuery.json' api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion' #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion' ##这套名字,是联通的。 name_dic={ "排口":"入河、湖排口", "排污口": "入河、湖排口", "水生植被": "水生植物", "漂浮物": "有大面积漂物", "结束": "结束", '其它' :'其它' } ## for TH river ##这套代码是河长制度的。 ''' nameID_dic={ "排口":'00000', "排污口": '8378', "水生植被": '8380', "漂浮物": '8368', "结束":'9999', "其它":'8888' } ''' msg_dict_off={ "request_id":"fflvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识 "status":"running",#任务状态 "type":str(2),#消息类型 1:实时 2:离线 #"error":str(9999),#错误信息#### "error_code":"",#//错误编号 "error_msg":"",#//错误描述 "progress":"", "results":[#问题结果 { "original_url":"",#原图地址 "sign_url":"",#AI标记地址 "category_id":"",#分类标识 "description":"",#问题描述 "analyse_time":"",#时间戳 } ] } msg_dict_on={ "request_id":"nnlvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识 "status":"running",#任务状态 "type":str(1),#消息类型 1:实时 2:离线 "error_code":"",#//错误编号 "error_msg":"",#//错误描述 "progressOn":"", "results":[#问题结果 { "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传) "sign_url":"",#识别后视频地址 } ] } def mintor_offline_ending(parIn): indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log'] par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic; logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending' time_interval = parIn['timeInterval'] ###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。 producer = KafkaProducer( bootstrap_servers=par_kafka['server'],#tencent yun value_serializer=lambda v: v.encode('utf-8'), metadata_max_age_ms=120000, ) outStrList={} writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) time_ss0=time.time() while True: filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime) filelist = filelist_AI off_msgs=[] for filename in filelist[0:]: filename_base = os.path.basename(filename) ##解析文件名 typename,requestId,onLineType = parse_filename_for_oss(filename_base) if (onLineType=='off') and (typename=='结束'): off_msgs.append(requestId) for requestId in off_msgs: msg_heart = copy.deepcopy(msg_dict_off) msg_heart['status']='running' msg_heart["request_id"]=requestId msg_heart = json.dumps(msg_heart, ensure_ascii=False) outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(requestId) outStrList['failure']='----- kafka error when sending heartBeat in Transfer ' outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer ' send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread ); time.sleep(time_interval) time_ss1=time.time() if time_ss1 - time_ss0>120: outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0) writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) time_ss0=time_ss1 def test5(par): indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir'] hearBeatTimeMs = par['hearBeatTimeMs'] videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar'] time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main' fp_log=create_logFile(logdir=logdir,name=logname) logger=logdir.replace('/','.')+'.'+logname writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) nameID_dic=getNamedic(par['labelnamesFile'].strip()) parIn={};####给心跳信号发射的子进程参数 parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,)) HeartProcess.start() ifind=0 time0_0 = time.time() producer = KafkaProducer( bootstrap_servers=kafkaPar['boostServer'],#tencent yun value_serializer=lambda v: v.encode('utf-8'), metadata_max_age_ms=120000, ) ###登陆准备存储桶 auth = oss2.Auth(ossPar['AId'], ossPar['ASt']) # Endpoint以杭州为例,其它Region请按实际情况填写。 bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName']) ##VOD clt = init_vod_client(vodPar['AId'], vodPar['ASt']) uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt']) writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) par_heart={};outStrList={} time_b0=time.time() while True: filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime) filelist=[] for filename in filelist_AI: filename_base = os.path.basename(filename) typename,requestId,onLineType = parse_filename_for_oss(filename_base) if typename in ["其它"]: continue filelist.append(filename) if len(filelist)!=0: time0 = time.time() for filename in filelist[0:]: filename_base = os.path.basename(filename) ##解析文件名 typename,requestId,onLineType = parse_filename_for_oss(filename_base) ##存储文件 filename_OR=filename.replace('_AI.','_OR.') filename_AI_image = filename.replace('.txt','.jpg') filename_OR_image = filename_OR.replace('.txt','.jpg') taskInfos = lodaMsgInfos(jsonDir,requestId) oss_dir = taskInfos['results_base_dir'] #if typename in ["排口","其它"]: # continue if typename not in ['结束','超时结束']: time_s1 = time.time() ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image)) ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image)) bucket.put_object_from_file(ObjectName_AI, filename_AI_image) ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image) outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename) #outstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) msg = copy.deepcopy(msg_dict_off) if onLineType!='off': msg['type']=str(1) else: msg['type']=str(2) msg['results'][0]['original_url']= ObjectName_OR msg['results'][0]['sign_url']= ObjectName_AI msg['results'][0]['category_id']= nameID_dic[typename] msg['results'][0]['description']= typename msg['results'][0]['analyse_time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = update_json(taskInfos,msg) time_s2 = time.time() else: time_s1 = time.time() if onLineType!='off': msg = copy.deepcopy(msg_dict_on) msg["request_id"]=requestId ;msg['type']=str(1) msg['results'][0]['original_url']= "yourAddess" msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件 upCnt=1;upLoaded=False while upCnt<4: try: videoUrl= get_videoUurl(videoBakDir,requestId+'_AI.MP4') assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo') videoId = uploader.uploadLocalVideo(uploadVideoRequest) VideoId_AI=str(videoId['VideoId']) videoUrl= get_videoUurl(videoBakDir,requestId+'_OR.MP4') assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo') videoId = uploader.uploadLocalVideo(uploadVideoRequest) VideoId_OR=str(videoId['VideoId']) outstr=VideoId_OR+','+VideoId_AI writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) msg['results'][0]['sign_url']=VideoId_AI msg['results'][0]['original_url']=VideoId_OR upCnt=4;upLoaded=True except Exception as e: writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger); upCnt+=1;upLoaded=False if not upLoaded: msg['error_msg']='video uploading failure' ; msg['error_code']='101' ; else: msg = copy.deepcopy(msg_dict_off) msg['type']=str(2) msg["request_id"]=requestId msg['results'][0]['original_url']= taskInfos['original_url'] videoUrl= get_videoUurl(videoBakDir,requestId+'.MP4') assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) upCnt=1;upLoaded=False while upCnt<4: try: uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo') videoId = uploader.uploadLocalVideo(uploadVideoRequest) outstr=' oss upload video over %s '%(str(videoId['VideoId'])) writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件 upCnt=4;upLoaded=True except Exception as e: writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger); upCnt+=1;upLoaded=False if not upLoaded: msg['error_msg']='video uploading failure' ; msg['error_code']='101' ; if upLoaded: if typename=='结束': msg["status"]="success" else: msg["status"]="timeout" else: msg["status"]='failed' time_s2 = time.time() msg = json.dumps(msg, ensure_ascii=False) future = producer.send( kafkaPar['topic'], msg ) try: record_metadata = future.get() outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected()) #outstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) except Exception as e: outstr='kafka ERROR:%s'%(str(e)) #outstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger) producer.close() producer = KafkaProducer( bootstrap_servers=kafkaPar['boostServer'],#tencent yun value_serializer=lambda v: v.encode('utf-8') ) try: future = producer.send(kafkaPar['topic'], msg).get() except Exception as e: outstr='kafka resend ERROR:%s'%(str(e)) #poutstr=wrtiteLog(fp_log,outstr);print( outstr); writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger) time_s3 = time.time() ##上传后的图片,移走到另外一个文件夹### cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd) cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd) time_s4 = time.time() print('-'*50) else: time.sleep(1) time_b1=time.time() if time_b1-time_b0>120: writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger) time_b0=time_b1 fp_log.close() if __name__=='__main__': masterFile="conf/send_oss.json" assert os.path.exists(masterFile) with open(masterFile,'r') as fp: par=json.load(fp) test5(par)