DSPRiverInspection/DSP_Send_tranfer_oss.py

358 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PIL import Image
import numpy as np
import cv2
import base64
import io,os
import requests
import time,json
import string,random
import glob,string,sys
from multiprocessing import Process,Queue
import oss2,copy
from kafka import KafkaProducer, KafkaConsumer
from utilsK.sendUtils import *
from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
from voduploadsdk.UploadVideoRequest import UploadVideoRequest
from voduploadsdk.AliyunVodUtils import *
from voduploadsdk.AliyunVodUploader import AliyunVodUploader
import hashlib
from kafka.errors import kafka_errors
##for CeKanYuan
#10月21日通过图像名称判断是那个平台。方式不好。
#10月22日改成访问固定的地址从地址中读取平台的名称与地址。每隔2分钟访问一次。
#3月18日采用OSS阿里云存储桶
#platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
platform_query_url='SendLog/platformQuery.json'
api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
#api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
##这套名字,是联通的。
name_dic={
"排口":"入河、湖排口",
"排污口": "入河、湖排口",
"水生植被": "水生植物",
"漂浮物": "有大面积漂物",
"结束": "结束",
'其它' :'其它'
}
## for TH river
##这套代码是河长制度的。
'''
nameID_dic={
"排口":'00000',
"排污口": '8378',
"水生植被": '8380',
"漂浮物": '8368',
"结束":'9999',
"其它":'8888'
}
'''
msg_dict_off={
"request_id":"fflvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
"status":"running",#任务状态
"type":str(2),#消息类型 1实时 2离线
#"error":str(9999),#错误信息####
"error_code":"",#//错误编号
"error_msg":"",#//错误描述
"progress":"",
"results":[#问题结果
{
"original_url":"",#原图地址
"sign_url":"",#AI标记地址
"category_id":"",#分类标识
"description":"",#问题描述
"analyse_time":"",#时间戳
}
]
}
msg_dict_on={
"request_id":"nnlvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
"status":"running",#任务状态
"type":str(1),#消息类型 1实时 2离线
"error_code":"",#//错误编号
"error_msg":"",#//错误描述
"progressOn":"",
"results":[#问题结果
{
"original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
"sign_url":"",#识别后视频地址
}
]
}
def mintor_offline_ending(parIn):
indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
time_interval = parIn['timeInterval']
###轮询image_tmp的文件夹每10s一次一旦产生离线结束标志则不停地发送没30秒heartbeat信号。
producer = KafkaProducer(
bootstrap_servers=par_kafka['server'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000,
)
outStrList={}
writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
time_ss0=time.time()
while True:
filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
filelist = filelist_AI
off_msgs=[]
for filename in filelist[0:]:
filename_base = os.path.basename(filename)
##解析文件名
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
if (onLineType=='off') and (typename=='结束'):
off_msgs.append(requestId)
for requestId in off_msgs:
msg_heart = copy.deepcopy(msg_dict_off)
msg_heart['status']='running'
msg_heart["request_id"]=requestId
msg_heart = json.dumps(msg_heart, ensure_ascii=False)
outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(requestId)
outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
time.sleep(time_interval)
time_ss1=time.time()
if time_ss1 - time_ss0>120:
outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
time_ss0=time_ss1
def test5(par):
indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
hearBeatTimeMs = par['hearBeatTimeMs']
videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
fp_log=create_logFile(logdir=logdir,name=logname)
logger=logdir.replace('/','.')+'.'+logname
writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
nameID_dic=getNamedic(par['labelnamesFile'].strip())
parIn={};####给心跳信号发射的子进程参数
parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
HeartProcess.start()
ifind=0
time0_0 = time.time()
producer = KafkaProducer(
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'),
metadata_max_age_ms=120000,
)
###登陆准备存储桶
auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
# Endpoint以杭州为例其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
##VOD
clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
par_heart={};outStrList={}
time_b0=time.time()
while True:
filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
filelist=[]
for filename in filelist_AI:
filename_base = os.path.basename(filename)
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
if typename in ["其它"]:
continue
filelist.append(filename)
if len(filelist)!=0:
time0 = time.time()
for filename in filelist[0:]:
filename_base = os.path.basename(filename)
##解析文件名
typename,requestId,onLineType = parse_filename_for_oss(filename_base)
##存储文件
filename_OR=filename.replace('_AI.','_OR.')
filename_AI_image = filename.replace('.txt','.jpg')
filename_OR_image = filename_OR.replace('.txt','.jpg')
taskInfos = lodaMsgInfos(jsonDir,requestId)
oss_dir = taskInfos['results_base_dir']
#if typename in ["排口","其它"]:
# continue
if typename not in ['结束','超时结束']:
time_s1 = time.time()
ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
msg = copy.deepcopy(msg_dict_off)
if onLineType!='off': msg['type']=str(1)
else: msg['type']=str(2)
msg['results'][0]['original_url']= ObjectName_OR
msg['results'][0]['sign_url']= ObjectName_AI
msg['results'][0]['category_id']= nameID_dic[typename]
msg['results'][0]['description']= typename
msg['results'][0]['analyse_time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
msg = update_json(taskInfos,msg)
time_s2 = time.time()
else:
time_s1 = time.time()
if onLineType!='off':
msg = copy.deepcopy(msg_dict_on)
msg["request_id"]=requestId ;msg['type']=str(1)
msg['results'][0]['original_url']= "yourAddess"
msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
upCnt=1;upLoaded=False
while upCnt<4:
try:
videoUrl= get_videoUurl(videoBakDir,requestId+'_AI.MP4')
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
VideoId_AI=str(videoId['VideoId'])
videoUrl= get_videoUurl(videoBakDir,requestId+'_OR.MP4')
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
VideoId_OR=str(videoId['VideoId'])
outstr=VideoId_OR+','+VideoId_AI
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
msg['results'][0]['sign_url']=VideoId_AI
msg['results'][0]['original_url']=VideoId_OR
upCnt=4;upLoaded=True
except Exception as e:
writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
upCnt+=1;upLoaded=False
if not upLoaded:
msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
else:
msg = copy.deepcopy(msg_dict_off)
msg['type']=str(2)
msg["request_id"]=requestId
msg['results'][0]['original_url']= taskInfos['original_url']
videoUrl= get_videoUurl(videoBakDir,requestId+'.MP4')
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
upCnt=1;upLoaded=False
while upCnt<4:
try:
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
videoId = uploader.uploadLocalVideo(uploadVideoRequest)
outstr=' oss upload video over %s '%(str(videoId['VideoId']))
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
upCnt=4;upLoaded=True
except Exception as e:
writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
upCnt+=1;upLoaded=False
if not upLoaded:
msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
if upLoaded:
if typename=='结束': msg["status"]="success"
else: msg["status"]="timeout"
else:
msg["status"]='failed'
time_s2 = time.time()
msg = json.dumps(msg, ensure_ascii=False)
future = producer.send(
kafkaPar['topic'],
msg
)
try:
record_metadata = future.get()
outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
except Exception as e:
outstr='kafka ERROR:%s'%(str(e))
#outstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
producer.close()
producer = KafkaProducer(
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
value_serializer=lambda v: v.encode('utf-8')
)
try:
future = producer.send(kafkaPar['topic'], msg).get()
except Exception as e:
outstr='kafka resend ERROR:%s'%(str(e))
#poutstr=wrtiteLog(fp_log,outstr);print( outstr);
writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
time_s3 = time.time()
##上传后的图片,移走到另外一个文件夹###
cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
time_s4 = time.time()
print('-'*50)
else:
time.sleep(1)
time_b1=time.time()
if time_b1-time_b0>120:
writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
time_b0=time_b1
fp_log.close()
if __name__=='__main__':
masterFile="conf/send_oss.json"
assert os.path.exists(masterFile)
with open(masterFile,'r') as fp:
par=json.load(fp)
test5(par)