DSPRiverInspection/oss.py

304 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PIL import Image
import numpy as np
import cv2
import base64
import io,os
import requests
import time,json
import string,random
import glob,string,sys
from multiprocessing import Process,Queue
import oss2
from kafka import KafkaProducer, KafkaConsumer
##for CeKanYuan
#10月21日通过图像名称判断是那个平台。方式不好。
#10月22日改成访问固定的地址从地址中读取平台的名称与地址。每隔2分钟访问一次。
#3月18日采用OSS阿里云存储桶
#platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
platform_query_url='SendLog/platformQuery.json'
api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
#api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
##这套名字,是联通的。
name_dic={
"排口":"入河、湖排口",
"疑似污口": "入河、湖排口",
"水生植被": "水生植物",
"漂浮物": "有大面积漂物",
"结束": "结束",
'其它' :'其它'
}
## for TH river
##这套代码是河长制度的。
nameID_dic={
"排口":'00000',
"疑似污口": '8378',
"水生植被": '8380',
"漂浮物": '8368',
"结束":'9999',
'其它':'8888'
}
def get_time(filename):
#2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg
sps=filename.strip().split('_')[0]
tsps=sps.split('-')
return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5])
def get_ms(time0,time1):
str_time ='%.2f ms'%((time1-time0)*1000)
return str_time
def get_urls( platform_query_url,fp_log ):
try:
if os.path.exists(platform_query_url):
#print('###line49')
with open('SendLog/platformQuery.json','r') as fp:
res = json.load(fp)
else:
res = requests.get(platform_query_url,timeout=10).json()
#print('###line54')
questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址
offlineUrl = res['data']['offlineUrl'] ###http离线视频时问题图片的推送地址
except Exception as ee:
timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url))
outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url)
fp_log.write(outstr);fp_log.flush()
questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
return questionUrl,offlineUrl
def parse_filename(filename_base):
#etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg
uid =filename_base.split('.')[0].split('_')[3].strip()
sourceType=filename_base.split('_')[4].split('-')[1]
sourceId=filename_base.split('_')[4].split('-')[2]
typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip()
return uid,sourceType,sourceId,typename
def b64encode_function(filename, filename_OR):
if os.path.exists(filename):
image_ori=cv2.imread(filename)
image_ori_OR=cv2.imread(filename_OR)
else:
image_ori = filename.copy()
image_ori_OR = image_ori_OR.copy()
image_pngcode = cv2.imencode('.jpg',image_ori)[-1]
image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1]
image_code = str(base64.b64encode(image_pngcode))[2:-1]
image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1]
return image_code, image_code_OR
def JsonSend(parIn):
fp_log = parIn['fp_log']
try:
response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json()
t3 = time.time()
print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage']))
outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str'])
fp_log.write(outstr);fp_log.flush()
except Exception as ee:
print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage']))
outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage'])
fp_log.write(outstr);fp_log.flush()
def dic2str(dic):
st=''
for key in dic.keys():
st='%s %s:%s,'%(st,key,dic[key])
return st
def createJsonInput(filename,offlineUrl,questionUrl):
flag = True
filename_base = os.path.basename(filename)
filename_OR=filename.replace('_AI.','_OR.')
if not os.path.exists(filename_OR ):
return False
uid,sourceType, sourceId,typename = parse_filename(filename_base)
if (typename not in name_dic.keys()) or (typename == '排口'):
return False
api = questionUrl if sourceType=='live' else offlineUrl
time_str = get_time(filename_base)
input_ ={
'imgName':os.path.basename(filename),
'imgNameOriginal':os.path.basename(filename_OR),
'time':time_str,
'fid':uid, ###随机16位字符
'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"]
'typeId':nameID_dic[typename]
}
if sourceType!='live':
input_['code']=sourceId;###只有离线视频才需要code
dic_str = dic2str(input_)
t1 = time.time()
image_code, image_code_OR = b64encode_function(filename, filename_OR)
input_['imgData']=image_code
input_['imgDataOriginal']=image_code_OR
sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0
parOut={};parOut['flag']=True;parOut['input_']=input_;
parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str;
parOut['filename']=filename;parOut['filename_OR']=filename_OR;
parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base
return parOut
def getLogFileFp(streamName):
logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName)
if os.path.exists(logname):
fp_log = open(logname,'a+')
else:
fp_log = open(logname,'w')
return
def lodaMsgInfos(jsonDir,msgId):
jsonUrl = os.path.join(jsonDir,msgId+'.json')
with open(jsonUrl,'r') as fp:
data=json.load(fp)
return data
def parse_filename_for_oss(name):
splts=name.split('_')
typename=splts[2].split('-')[1].strip()
msgId=splts[4].split('-')[3]
onLineType=splts[4].split('-')[1]
return typename,msgId,onLineType
msg_dict_off={
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
"biz_id":"hehuzhang",#业务标识
"mod_id":"ai",#模型标识
"status":"running",#任务状态
"type":str(1),#数据类型1图片 2视频
"error":9999,#错误信息
"results":[#问题结果
{
"original_url":"",#原图地址
"sign_url":"",#AI标记地址
"category_id":"",#分类标识
"description":"",#问题描述
"time":"",#时间戳
}
]
}
msg_dict_on={
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
"biz_id":"hehuzhang",#业务标识
"mod_id":"qi",#模型标识
"status":"running",#任务状态
"type":str(2),#数据类型1图片 2视频
"error":9999,#错误信息
"results":[#问题结果
{
"original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
"sign_url":"",#识别后视频地址
}
]
}
def update_json(jsonOri,jsonNew,offkeys=["msg_id","biz_id" ,"mod_id" ]):
#{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'msg_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'}
for key in offkeys:
jsonNew[key] = jsonOri[key]
return jsonNew
def test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar):
time0_0 = time.time()
logname ='SendLog/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
if os.path.exists(logname):
fp_log = open(logname,'a+')
else:
fp_log = open(logname,'w')
ifind=0
time0_0 = time.time()
producer = KafkaProducer(
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
value_serializer=lambda v: v.encode('utf-8'))
###登陆准备存储桶
auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
# Endpoint以杭州为例其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
while True:
#filelist = os.listdir(indir)
filelist_AI = sorted(glob.glob('%s/*_AI.*'%(indir)),key=os.path.getmtime)
filelist = filelist_AI
if len(filelist)!=0:
time0 = time.time()
for filename in filelist[0:2]:
filename_base = os.path.basename(filename)
##解析文件名
typename,msgId,onLineType = parse_filename_for_oss(filename_base)
##存储文件
filename_OR=filename.replace('_AI.','_OR.')
if typename!='结束':
ObjectName_AI=os.path.join(ossPar['bucketName'],os.path.basename(filename))
ObjectName_OR=os.path.join(ossPar['bucketName'],os.path.basename(filename_OR))
bucket.put_object_from_file(ObjectName_AI, filename)
bucket.put_object_from_file(ObjectName_OR, filename_OR)
taskInfos = lodaMsgInfos(jsonDir,msgId)
#print(taskInfos)
##发送返回信息
#if onLineType=='off':
msg = msg_dict_off
msg['results'][0]['original_url']= ObjectName_OR
msg['results'][0]['sign_url']= ObjectName_AI
msg['results'][0]['category_id']= nameID_dic[typename]
msg['results'][0]['description']= typename
msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
msg = update_json(taskInfos,msg)
else:
msg = msg_dict_on
videoList = sorted(glob.glob('%s/*'%(videoBakDir)),key=os.path.getmtime)
videoName = os.path.basename(videoList[0])
msg["status"]="success";msg["msg_id"]=msgId
ObjectName_AI=os.path.join(ossPar['bucketName'],videoName)
bucket.put_object_from_file(ObjectName_AI, videoList[0])
msg['results'][0]['original_url']= ObjectName_AI
msg['results'][0]['sign_url']= ObjectName_AI###最新的视频文件
print('###'*3,'Send:',filename)
msg = json.dumps(msg, ensure_ascii=False)
future = producer.send(
kafkaPar['topic'],
msg
)
print('***'*20,' Send transfer ',onLineType,msg)
##上传后的图片,移走到另外一个文件夹###
cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
else:
time.sleep(1)
fp_log.close()
if __name__=='__main__':
indir='problems/images_tmp'
outdir='problems/images_save'
jsonDir = 'mintors/kafka/'
videoBakDir='../../data/video_live_bak/1945'
ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
'AId':'LTAI5tSJ62TLMUb4SZuf285A',
'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
'bucketName':'ta-tech-image',
}
#kafkaPar={'boostServer':['212.129.223.66:9092'],'topic':'testReturn'}
kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar)