358 lines
16 KiB
Python
358 lines
16 KiB
Python
from PIL import Image
|
||
import numpy as np
|
||
import cv2
|
||
import base64
|
||
import io,os
|
||
import requests
|
||
import time,json
|
||
import string,random
|
||
import glob,string,sys
|
||
from multiprocessing import Process,Queue
|
||
import oss2,copy
|
||
from kafka import KafkaProducer, KafkaConsumer
|
||
from utilsK.sendUtils import *
|
||
|
||
from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
|
||
from voduploadsdk.UploadVideoRequest import UploadVideoRequest
|
||
from voduploadsdk.AliyunVodUtils import *
|
||
from voduploadsdk.AliyunVodUploader import AliyunVodUploader
|
||
import hashlib
|
||
from kafka.errors import kafka_errors
|
||
##for CeKanYuan
|
||
#10月21日,通过图像名称判断,是那个平台。方式不好。
|
||
#10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
|
||
#3月18日,采用OSS阿里云存储桶
|
||
#platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
|
||
platform_query_url='SendLog/platformQuery.json'
|
||
api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
|
||
#api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
|
||
|
||
##这套名字,是联通的。
|
||
name_dic={
|
||
"排口":"入河、湖排口",
|
||
"排污口": "入河、湖排口",
|
||
"水生植被": "水生植物",
|
||
"漂浮物": "有大面积漂物",
|
||
"结束": "结束",
|
||
'其它' :'其它'
|
||
}
|
||
## for TH river
|
||
##这套代码是河长制度的。
|
||
'''
|
||
nameID_dic={
|
||
"排口":'00000',
|
||
"排污口": '8378',
|
||
"水生植被": '8380',
|
||
"漂浮物": '8368',
|
||
"结束":'9999',
|
||
"其它":'8888'
|
||
}
|
||
'''
|
||
|
||
msg_dict_off={
|
||
"request_id":"fflvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
|
||
"status":"running",#任务状态
|
||
"type":str(2),#消息类型 1:实时 2:离线
|
||
#"error":str(9999),#错误信息####
|
||
"error_code":"",#//错误编号
|
||
"error_msg":"",#//错误描述
|
||
"progress":"",
|
||
"results":[#问题结果
|
||
{
|
||
"original_url":"",#原图地址
|
||
"sign_url":"",#AI标记地址
|
||
"category_id":"",#分类标识
|
||
"description":"",#问题描述
|
||
"analyse_time":"",#时间戳
|
||
}
|
||
]
|
||
}
|
||
|
||
msg_dict_on={
|
||
"request_id":"nnlvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
|
||
"status":"running",#任务状态
|
||
"type":str(1),#消息类型 1:实时 2:离线
|
||
"error_code":"",#//错误编号
|
||
"error_msg":"",#//错误描述
|
||
"progressOn":"",
|
||
"results":[#问题结果
|
||
{
|
||
"original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
|
||
"sign_url":"",#识别后视频地址
|
||
}
|
||
]
|
||
}
|
||
|
||
|
||
|
||
def mintor_offline_ending(parIn):
|
||
|
||
indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
|
||
par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
|
||
logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
|
||
time_interval = parIn['timeInterval']
|
||
###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=par_kafka['server'],#tencent yun
|
||
value_serializer=lambda v: v.encode('utf-8'),
|
||
metadata_max_age_ms=120000,
|
||
)
|
||
outStrList={}
|
||
writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
time_ss0=time.time()
|
||
while True:
|
||
|
||
filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
|
||
filelist = filelist_AI
|
||
|
||
off_msgs=[]
|
||
|
||
for filename in filelist[0:]:
|
||
filename_base = os.path.basename(filename)
|
||
##解析文件名
|
||
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
|
||
if (onLineType=='off') and (typename=='结束'):
|
||
off_msgs.append(requestId)
|
||
|
||
for requestId in off_msgs:
|
||
msg_heart = copy.deepcopy(msg_dict_off)
|
||
msg_heart['status']='running'
|
||
msg_heart["request_id"]=requestId
|
||
msg_heart = json.dumps(msg_heart, ensure_ascii=False)
|
||
|
||
outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(requestId)
|
||
outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
|
||
outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
|
||
|
||
|
||
send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
|
||
|
||
time.sleep(time_interval)
|
||
time_ss1=time.time()
|
||
if time_ss1 - time_ss0>120:
|
||
outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
|
||
writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
time_ss0=time_ss1
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def test5(par):
|
||
|
||
indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
|
||
hearBeatTimeMs = par['hearBeatTimeMs']
|
||
videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
|
||
|
||
|
||
|
||
time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
|
||
fp_log=create_logFile(logdir=logdir,name=logname)
|
||
|
||
logger=logdir.replace('/','.')+'.'+logname
|
||
writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
nameID_dic=getNamedic(par['labelnamesFile'].strip())
|
||
|
||
parIn={};####给心跳信号发射的子进程参数
|
||
parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
|
||
parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
|
||
HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
|
||
HeartProcess.start()
|
||
|
||
|
||
ifind=0
|
||
time0_0 = time.time()
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
|
||
value_serializer=lambda v: v.encode('utf-8'),
|
||
metadata_max_age_ms=120000,
|
||
)
|
||
|
||
|
||
|
||
|
||
###登陆准备存储桶
|
||
auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
|
||
# Endpoint以杭州为例,其它Region请按实际情况填写。
|
||
bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
|
||
|
||
##VOD
|
||
clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
|
||
uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
|
||
|
||
|
||
writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
par_heart={};outStrList={}
|
||
time_b0=time.time()
|
||
while True:
|
||
|
||
filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
|
||
filelist=[]
|
||
for filename in filelist_AI:
|
||
filename_base = os.path.basename(filename)
|
||
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
|
||
if typename in ["其它"]:
|
||
continue
|
||
filelist.append(filename)
|
||
|
||
if len(filelist)!=0:
|
||
time0 = time.time()
|
||
for filename in filelist[0:]:
|
||
filename_base = os.path.basename(filename)
|
||
##解析文件名
|
||
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
|
||
|
||
##存储文件
|
||
filename_OR=filename.replace('_AI.','_OR.')
|
||
|
||
filename_AI_image = filename.replace('.txt','.jpg')
|
||
filename_OR_image = filename_OR.replace('.txt','.jpg')
|
||
|
||
taskInfos = lodaMsgInfos(jsonDir,requestId)
|
||
oss_dir = taskInfos['results_base_dir']
|
||
|
||
#if typename in ["排口","其它"]:
|
||
# continue
|
||
if typename not in ['结束','超时结束']:
|
||
|
||
time_s1 = time.time()
|
||
ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
|
||
ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
|
||
bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
|
||
ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
|
||
|
||
outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
|
||
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
|
||
msg = copy.deepcopy(msg_dict_off)
|
||
if onLineType!='off': msg['type']=str(1)
|
||
else: msg['type']=str(2)
|
||
msg['results'][0]['original_url']= ObjectName_OR
|
||
msg['results'][0]['sign_url']= ObjectName_AI
|
||
msg['results'][0]['category_id']= nameID_dic[typename]
|
||
msg['results'][0]['description']= typename
|
||
msg['results'][0]['analyse_time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
msg = update_json(taskInfos,msg)
|
||
time_s2 = time.time()
|
||
else:
|
||
time_s1 = time.time()
|
||
if onLineType!='off':
|
||
msg = copy.deepcopy(msg_dict_on)
|
||
msg["request_id"]=requestId ;msg['type']=str(1)
|
||
msg['results'][0]['original_url']= "yourAddess"
|
||
msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
|
||
upCnt=1;upLoaded=False
|
||
while upCnt<4:
|
||
try:
|
||
videoUrl= get_videoUurl(videoBakDir,requestId+'_AI.MP4')
|
||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
|
||
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
|
||
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
|
||
|
||
VideoId_AI=str(videoId['VideoId'])
|
||
|
||
videoUrl= get_videoUurl(videoBakDir,requestId+'_OR.MP4')
|
||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
|
||
|
||
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
|
||
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
|
||
VideoId_OR=str(videoId['VideoId'])
|
||
|
||
outstr=VideoId_OR+','+VideoId_AI
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
msg['results'][0]['sign_url']=VideoId_AI
|
||
msg['results'][0]['original_url']=VideoId_OR
|
||
upCnt=4;upLoaded=True
|
||
except Exception as e:
|
||
writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
|
||
upCnt+=1;upLoaded=False
|
||
if not upLoaded:
|
||
msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
|
||
|
||
|
||
else:
|
||
msg = copy.deepcopy(msg_dict_off)
|
||
msg['type']=str(2)
|
||
msg["request_id"]=requestId
|
||
msg['results'][0]['original_url']= taskInfos['original_url']
|
||
videoUrl= get_videoUurl(videoBakDir,requestId+'.MP4')
|
||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
|
||
upCnt=1;upLoaded=False
|
||
while upCnt<4:
|
||
try:
|
||
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
|
||
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
|
||
outstr=' oss upload video over %s '%(str(videoId['VideoId']))
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
|
||
upCnt=4;upLoaded=True
|
||
except Exception as e:
|
||
writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
|
||
upCnt+=1;upLoaded=False
|
||
if not upLoaded:
|
||
msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
|
||
if upLoaded:
|
||
if typename=='结束': msg["status"]="success"
|
||
else: msg["status"]="timeout"
|
||
else:
|
||
msg["status"]='failed'
|
||
time_s2 = time.time()
|
||
|
||
msg = json.dumps(msg, ensure_ascii=False)
|
||
future = producer.send(
|
||
kafkaPar['topic'],
|
||
msg
|
||
)
|
||
|
||
|
||
try:
|
||
record_metadata = future.get()
|
||
outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
|
||
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
except Exception as e:
|
||
outstr='kafka ERROR:%s'%(str(e))
|
||
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
|
||
producer.close()
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
|
||
value_serializer=lambda v: v.encode('utf-8')
|
||
)
|
||
try:
|
||
future = producer.send(kafkaPar['topic'], msg).get()
|
||
except Exception as e:
|
||
outstr='kafka resend ERROR:%s'%(str(e))
|
||
#poutstr=wrtiteLog(fp_log,outstr);print( outstr);
|
||
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
|
||
|
||
|
||
time_s3 = time.time()
|
||
|
||
##上传后的图片,移走到另外一个文件夹###
|
||
cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
|
||
cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
|
||
time_s4 = time.time()
|
||
print('-'*50)
|
||
|
||
else:
|
||
time.sleep(1)
|
||
time_b1=time.time()
|
||
if time_b1-time_b0>120:
|
||
writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
|
||
time_b0=time_b1
|
||
fp_log.close()
|
||
|
||
if __name__=='__main__':
|
||
|
||
masterFile="conf/send_oss.json"
|
||
assert os.path.exists(masterFile)
|
||
with open(masterFile,'r') as fp:
|
||
par=json.load(fp)
|
||
|
||
test5(par)
|