304 lines
13 KiB
Python
304 lines
13 KiB
Python
from PIL import Image
|
||
import numpy as np
|
||
import cv2
|
||
import base64
|
||
import io,os
|
||
import requests
|
||
import time,json
|
||
import string,random
|
||
import glob,string,sys
|
||
from multiprocessing import Process,Queue
|
||
import oss2
|
||
from kafka import KafkaProducer, KafkaConsumer
|
||
##for CeKanYuan
|
||
#10月21日,通过图像名称判断,是那个平台。方式不好。
|
||
#10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
|
||
#3月18日,采用OSS阿里云存储桶
|
||
#platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
|
||
platform_query_url='SendLog/platformQuery.json'
|
||
api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
|
||
#api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
|
||
|
||
##这套名字,是联通的。
|
||
name_dic={
|
||
"排口":"入河、湖排口",
|
||
"疑似污口": "入河、湖排口",
|
||
"水生植被": "水生植物",
|
||
"漂浮物": "有大面积漂物",
|
||
"结束": "结束",
|
||
'其它' :'其它'
|
||
}
|
||
## for TH river
|
||
##这套代码是河长制度的。
|
||
nameID_dic={
|
||
"排口":'00000',
|
||
"疑似污口": '8378',
|
||
"水生植被": '8380',
|
||
"漂浮物": '8368',
|
||
"结束":'9999',
|
||
'其它':'8888'
|
||
}
|
||
|
||
def get_time(filename):
|
||
#2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg
|
||
sps=filename.strip().split('_')[0]
|
||
tsps=sps.split('-')
|
||
return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5])
|
||
def get_ms(time0,time1):
|
||
str_time ='%.2f ms'%((time1-time0)*1000)
|
||
return str_time
|
||
|
||
def get_urls( platform_query_url,fp_log ):
|
||
try:
|
||
if os.path.exists(platform_query_url):
|
||
#print('###line49')
|
||
with open('SendLog/platformQuery.json','r') as fp:
|
||
res = json.load(fp)
|
||
else:
|
||
res = requests.get(platform_query_url,timeout=10).json()
|
||
#print('###line54')
|
||
questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址
|
||
offlineUrl = res['data']['offlineUrl'] ###http离线视频时,问题图片的推送地址
|
||
except Exception as ee:
|
||
timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url))
|
||
outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url)
|
||
fp_log.write(outstr);fp_log.flush()
|
||
questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
|
||
offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
|
||
return questionUrl,offlineUrl
|
||
def parse_filename(filename_base):
|
||
#etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg
|
||
uid =filename_base.split('.')[0].split('_')[3].strip()
|
||
sourceType=filename_base.split('_')[4].split('-')[1]
|
||
sourceId=filename_base.split('_')[4].split('-')[2]
|
||
typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip()
|
||
return uid,sourceType,sourceId,typename
|
||
def b64encode_function(filename, filename_OR):
|
||
if os.path.exists(filename):
|
||
image_ori=cv2.imread(filename)
|
||
image_ori_OR=cv2.imread(filename_OR)
|
||
else:
|
||
image_ori = filename.copy()
|
||
image_ori_OR = image_ori_OR.copy()
|
||
image_pngcode = cv2.imencode('.jpg',image_ori)[-1]
|
||
image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1]
|
||
image_code = str(base64.b64encode(image_pngcode))[2:-1]
|
||
image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1]
|
||
return image_code, image_code_OR
|
||
def JsonSend(parIn):
|
||
|
||
fp_log = parIn['fp_log']
|
||
try:
|
||
response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json()
|
||
t3 = time.time()
|
||
print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage']))
|
||
outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str'])
|
||
fp_log.write(outstr);fp_log.flush()
|
||
|
||
except Exception as ee:
|
||
print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage']))
|
||
outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage'])
|
||
fp_log.write(outstr);fp_log.flush()
|
||
|
||
|
||
def dic2str(dic):
|
||
st=''
|
||
for key in dic.keys():
|
||
st='%s %s:%s,'%(st,key,dic[key])
|
||
return st
|
||
def createJsonInput(filename,offlineUrl,questionUrl):
|
||
flag = True
|
||
filename_base = os.path.basename(filename)
|
||
filename_OR=filename.replace('_AI.','_OR.')
|
||
if not os.path.exists(filename_OR ):
|
||
return False
|
||
|
||
uid,sourceType, sourceId,typename = parse_filename(filename_base)
|
||
if (typename not in name_dic.keys()) or (typename == '排口'):
|
||
return False
|
||
api = questionUrl if sourceType=='live' else offlineUrl
|
||
|
||
time_str = get_time(filename_base)
|
||
input_ ={
|
||
'imgName':os.path.basename(filename),
|
||
'imgNameOriginal':os.path.basename(filename_OR),
|
||
'time':time_str,
|
||
'fid':uid, ###随机16位字符
|
||
'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"]
|
||
'typeId':nameID_dic[typename]
|
||
|
||
}
|
||
if sourceType!='live':
|
||
input_['code']=sourceId;###只有离线视频才需要code,
|
||
|
||
dic_str = dic2str(input_)
|
||
t1 = time.time()
|
||
|
||
image_code, image_code_OR = b64encode_function(filename, filename_OR)
|
||
input_['imgData']=image_code
|
||
input_['imgDataOriginal']=image_code_OR
|
||
|
||
sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0
|
||
|
||
parOut={};parOut['flag']=True;parOut['input_']=input_;
|
||
parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str;
|
||
parOut['filename']=filename;parOut['filename_OR']=filename_OR;
|
||
parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base
|
||
return parOut
|
||
|
||
def getLogFileFp(streamName):
|
||
logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName)
|
||
if os.path.exists(logname):
|
||
fp_log = open(logname,'a+')
|
||
else:
|
||
fp_log = open(logname,'w')
|
||
return
|
||
|
||
def lodaMsgInfos(jsonDir,msgId):
|
||
jsonUrl = os.path.join(jsonDir,msgId+'.json')
|
||
with open(jsonUrl,'r') as fp:
|
||
data=json.load(fp)
|
||
return data
|
||
|
||
def parse_filename_for_oss(name):
|
||
splts=name.split('_')
|
||
typename=splts[2].split('-')[1].strip()
|
||
msgId=splts[4].split('-')[3]
|
||
onLineType=splts[4].split('-')[1]
|
||
return typename,msgId,onLineType
|
||
|
||
msg_dict_off={
|
||
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
|
||
"biz_id":"hehuzhang",#业务标识
|
||
"mod_id":"ai",#模型标识
|
||
"status":"running",#任务状态
|
||
"type":str(1),#数据类型:1图片 2视频
|
||
"error":9999,#错误信息
|
||
"results":[#问题结果
|
||
{
|
||
"original_url":"",#原图地址
|
||
"sign_url":"",#AI标记地址
|
||
"category_id":"",#分类标识
|
||
"description":"",#问题描述
|
||
"time":"",#时间戳
|
||
}
|
||
]
|
||
}
|
||
|
||
msg_dict_on={
|
||
"msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
|
||
"biz_id":"hehuzhang",#业务标识
|
||
"mod_id":"qi",#模型标识
|
||
"status":"running",#任务状态
|
||
"type":str(2),#数据类型:1图片 2视频
|
||
"error":9999,#错误信息
|
||
"results":[#问题结果
|
||
{
|
||
"original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
|
||
"sign_url":"",#识别后视频地址
|
||
}
|
||
]
|
||
}
|
||
|
||
def update_json(jsonOri,jsonNew,offkeys=["msg_id","biz_id" ,"mod_id" ]):
|
||
#{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'msg_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'}
|
||
for key in offkeys:
|
||
jsonNew[key] = jsonOri[key]
|
||
return jsonNew
|
||
def test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar):
|
||
|
||
|
||
time0_0 = time.time()
|
||
logname ='SendLog/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
|
||
if os.path.exists(logname):
|
||
fp_log = open(logname,'a+')
|
||
else:
|
||
fp_log = open(logname,'w')
|
||
ifind=0
|
||
time0_0 = time.time()
|
||
|
||
producer = KafkaProducer(
|
||
bootstrap_servers=kafkaPar['boostServer'],#tencent yun
|
||
value_serializer=lambda v: v.encode('utf-8'))
|
||
|
||
|
||
###登陆准备存储桶
|
||
auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
|
||
# Endpoint以杭州为例,其它Region请按实际情况填写。
|
||
bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
|
||
|
||
|
||
while True:
|
||
#filelist = os.listdir(indir)
|
||
filelist_AI = sorted(glob.glob('%s/*_AI.*'%(indir)),key=os.path.getmtime)
|
||
filelist = filelist_AI
|
||
|
||
if len(filelist)!=0:
|
||
time0 = time.time()
|
||
for filename in filelist[0:2]:
|
||
filename_base = os.path.basename(filename)
|
||
##解析文件名
|
||
typename,msgId,onLineType = parse_filename_for_oss(filename_base)
|
||
|
||
##存储文件
|
||
filename_OR=filename.replace('_AI.','_OR.')
|
||
|
||
|
||
|
||
if typename!='结束':
|
||
ObjectName_AI=os.path.join(ossPar['bucketName'],os.path.basename(filename))
|
||
ObjectName_OR=os.path.join(ossPar['bucketName'],os.path.basename(filename_OR))
|
||
bucket.put_object_from_file(ObjectName_AI, filename)
|
||
bucket.put_object_from_file(ObjectName_OR, filename_OR)
|
||
taskInfos = lodaMsgInfos(jsonDir,msgId)
|
||
#print(taskInfos)
|
||
##发送返回信息
|
||
#if onLineType=='off':
|
||
msg = msg_dict_off
|
||
msg['results'][0]['original_url']= ObjectName_OR
|
||
msg['results'][0]['sign_url']= ObjectName_AI
|
||
msg['results'][0]['category_id']= nameID_dic[typename]
|
||
msg['results'][0]['description']= typename
|
||
msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
msg = update_json(taskInfos,msg)
|
||
else:
|
||
msg = msg_dict_on
|
||
videoList = sorted(glob.glob('%s/*'%(videoBakDir)),key=os.path.getmtime)
|
||
videoName = os.path.basename(videoList[0])
|
||
msg["status"]="success";msg["msg_id"]=msgId
|
||
ObjectName_AI=os.path.join(ossPar['bucketName'],videoName)
|
||
bucket.put_object_from_file(ObjectName_AI, videoList[0])
|
||
msg['results'][0]['original_url']= ObjectName_AI
|
||
msg['results'][0]['sign_url']= ObjectName_AI###最新的视频文件
|
||
|
||
print('###'*3,'Send:',filename)
|
||
msg = json.dumps(msg, ensure_ascii=False)
|
||
future = producer.send(
|
||
kafkaPar['topic'],
|
||
msg
|
||
)
|
||
print('***'*20,' Send transfer ',onLineType,msg)
|
||
|
||
##上传后的图片,移走到另外一个文件夹###
|
||
cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
|
||
cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
|
||
|
||
else:
|
||
time.sleep(1)
|
||
fp_log.close()
|
||
|
||
if __name__=='__main__':
|
||
indir='problems/images_tmp'
|
||
outdir='problems/images_save'
|
||
jsonDir = 'mintors/kafka/'
|
||
videoBakDir='../../data/video_live_bak/1945'
|
||
ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
|
||
'AId':'LTAI5tSJ62TLMUb4SZuf285A',
|
||
'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
|
||
'bucketName':'ta-tech-image',
|
||
}
|
||
#kafkaPar={'boostServer':['212.129.223.66:9092'],'topic':'testReturn'}
|
||
kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
|
||
test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar)
|