|
- from PIL import Image
- import numpy as np
- import cv2
- import base64
- import io,os
- import requests
- import time,json
- import string,random
- import glob,string,sys
- from multiprocessing import Process,Queue
- import oss2,copy
- from kafka import KafkaProducer, KafkaConsumer
- from utilsK.sendUtils import *
-
- from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
- from voduploadsdk.UploadVideoRequest import UploadVideoRequest
- from voduploadsdk.AliyunVodUtils import *
- from voduploadsdk.AliyunVodUploader import AliyunVodUploader
- import hashlib
- from kafka.errors import kafka_errors
- ##for CeKanYuan
- #10月21日,通过图像名称判断,是那个平台。方式不好。
- #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
- #3月18日,采用OSS阿里云存储桶
- #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
- platform_query_url='SendLog/platformQuery.json'
- api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
- #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
-
- ##这套名字,是联通的。
- name_dic={
- "排口":"入河、湖排口",
- "排污口": "入河、湖排口",
- "水生植被": "水生植物",
- "漂浮物": "有大面积漂物",
- "结束": "结束",
- '其它' :'其它'
- }
- ## for TH river
- ##这套代码是河长制度的。
- '''
- nameID_dic={
- "排口":'00000',
- "排污口": '8378',
- "水生植被": '8380',
- "漂浮物": '8368',
- "结束":'9999',
- "其它":'8888'
- }
- '''
-
- msg_dict_off={
- "request_id":"fflvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "status":"running",#任务状态
- "type":str(2),#消息类型 1:实时 2:离线
- #"error":str(9999),#错误信息####
- "error_code":"",#//错误编号
- "error_msg":"",#//错误描述
- "progress":"",
- "results":[#问题结果
- {
- "original_url":"",#原图地址
- "sign_url":"",#AI标记地址
- "category_id":"",#分类标识
- "description":"",#问题描述
- "analyse_time":"",#时间戳
- }
- ]
- }
-
- msg_dict_on={
- "request_id":"nnlvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "status":"running",#任务状态
- "type":str(1),#消息类型 1:实时 2:离线
- "error_code":"",#//错误编号
- "error_msg":"",#//错误描述
- "progressOn":"",
- "results":[#问题结果
- {
- "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
- "sign_url":"",#识别后视频地址
- }
- ]
- }
-
-
-
- def mintor_offline_ending(parIn):
-
- indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
- par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
- logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
- time_interval = parIn['timeInterval']
- ###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。
- producer = KafkaProducer(
- bootstrap_servers=par_kafka['server'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8'),
- metadata_max_age_ms=120000,
- )
- outStrList={}
- writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_ss0=time.time()
- while True:
-
- filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
- filelist = filelist_AI
-
- off_msgs=[]
-
- for filename in filelist[0:]:
- filename_base = os.path.basename(filename)
- ##解析文件名
- typename,requestId,onLineType = parse_filename_for_oss(filename_base)
- if (onLineType=='off') and (typename=='结束'):
- off_msgs.append(requestId)
-
- for requestId in off_msgs:
- msg_heart = copy.deepcopy(msg_dict_off)
- msg_heart['status']='running'
- msg_heart["request_id"]=requestId
- msg_heart = json.dumps(msg_heart, ensure_ascii=False)
-
- outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(requestId)
- outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
- outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
-
-
- send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
-
- time.sleep(time_interval)
- time_ss1=time.time()
- if time_ss1 - time_ss0>120:
- outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
- writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_ss0=time_ss1
-
-
-
-
-
-
- def test5(par):
-
- indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
- hearBeatTimeMs = par['hearBeatTimeMs']
- videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
-
-
-
- time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
- fp_log=create_logFile(logdir=logdir,name=logname)
-
- logger=logdir.replace('/','.')+'.'+logname
- writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- nameID_dic=getNamedic(par['labelnamesFile'].strip())
-
- parIn={};####给心跳信号发射的子进程参数
- parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
- parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
- HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
- HeartProcess.start()
-
-
- ifind=0
- time0_0 = time.time()
-
- producer = KafkaProducer(
- bootstrap_servers=kafkaPar['boostServer'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8'),
- metadata_max_age_ms=120000,
- )
-
-
-
-
- ###登陆准备存储桶
- auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
- # Endpoint以杭州为例,其它Region请按实际情况填写。
- bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
-
- ##VOD
- clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
- uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
-
-
- writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- par_heart={};outStrList={}
- time_b0=time.time()
- while True:
-
- filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
- filelist=[]
- for filename in filelist_AI:
- filename_base = os.path.basename(filename)
- typename,requestId,onLineType = parse_filename_for_oss(filename_base)
- if typename in ["其它"]:
- continue
- filelist.append(filename)
-
- if len(filelist)!=0:
- time0 = time.time()
- for filename in filelist[0:]:
- filename_base = os.path.basename(filename)
- ##解析文件名
- typename,requestId,onLineType = parse_filename_for_oss(filename_base)
-
- ##存储文件
- filename_OR=filename.replace('_AI.','_OR.')
-
- filename_AI_image = filename.replace('.txt','.jpg')
- filename_OR_image = filename_OR.replace('.txt','.jpg')
-
- taskInfos = lodaMsgInfos(jsonDir,requestId)
- oss_dir = taskInfos['results_base_dir']
-
- #if typename in ["排口","其它"]:
- # continue
- if typename not in ['结束','超时结束']:
-
- time_s1 = time.time()
- ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
- ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
- bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
- ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
-
- outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
-
- msg = copy.deepcopy(msg_dict_off)
- if onLineType!='off': msg['type']=str(1)
- else: msg['type']=str(2)
- msg['results'][0]['original_url']= ObjectName_OR
- msg['results'][0]['sign_url']= ObjectName_AI
- msg['results'][0]['category_id']= nameID_dic[typename]
- msg['results'][0]['description']= typename
- msg['results'][0]['analyse_time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- msg = update_json(taskInfos,msg)
- time_s2 = time.time()
- else:
- time_s1 = time.time()
- if onLineType!='off':
- msg = copy.deepcopy(msg_dict_on)
- msg["request_id"]=requestId ;msg['type']=str(1)
- msg['results'][0]['original_url']= "yourAddess"
- msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
- upCnt=1;upLoaded=False
- while upCnt<4:
- try:
- videoUrl= get_videoUurl(videoBakDir,requestId+'_AI.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
-
- VideoId_AI=str(videoId['VideoId'])
-
- videoUrl= get_videoUurl(videoBakDir,requestId+'_OR.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
-
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- VideoId_OR=str(videoId['VideoId'])
-
- outstr=VideoId_OR+','+VideoId_AI
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- msg['results'][0]['sign_url']=VideoId_AI
- msg['results'][0]['original_url']=VideoId_OR
- upCnt=4;upLoaded=True
- except Exception as e:
- writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
- upCnt+=1;upLoaded=False
- if not upLoaded:
- msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
-
-
- else:
- msg = copy.deepcopy(msg_dict_off)
- msg['type']=str(2)
- msg["request_id"]=requestId
- msg['results'][0]['original_url']= taskInfos['original_url']
- videoUrl= get_videoUurl(videoBakDir,requestId+'.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
- upCnt=1;upLoaded=False
- while upCnt<4:
- try:
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- outstr=' oss upload video over %s '%(str(videoId['VideoId']))
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
- upCnt=4;upLoaded=True
- except Exception as e:
- writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
- upCnt+=1;upLoaded=False
- if not upLoaded:
- msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
- if upLoaded:
- if typename=='结束': msg["status"]="success"
- else: msg["status"]="timeout"
- else:
- msg["status"]='failed'
- time_s2 = time.time()
-
- msg = json.dumps(msg, ensure_ascii=False)
- future = producer.send(
- kafkaPar['topic'],
- msg
- )
-
-
- try:
- record_metadata = future.get()
- outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- except Exception as e:
- outstr='kafka ERROR:%s'%(str(e))
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
- producer.close()
-
- producer = KafkaProducer(
- bootstrap_servers=kafkaPar['boostServer'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8')
- )
- try:
- future = producer.send(kafkaPar['topic'], msg).get()
- except Exception as e:
- outstr='kafka resend ERROR:%s'%(str(e))
- #poutstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
-
-
- time_s3 = time.time()
-
- ##上传后的图片,移走到另外一个文件夹###
- cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
- cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
- time_s4 = time.time()
- print('-'*50)
-
- else:
- time.sleep(1)
- time_b1=time.time()
- if time_b1-time_b0>120:
- writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_b0=time_b1
- fp_log.close()
-
- if __name__=='__main__':
-
- masterFile="conf/send_oss.json"
- assert os.path.exists(masterFile)
- with open(masterFile,'r') as fp:
- par=json.load(fp)
-
- test5(par)
|