You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

304 satır
13KB

  1. from PIL import Image
  2. import numpy as np
  3. import cv2
  4. import base64
  5. import io,os
  6. import requests
  7. import time,json
  8. import string,random
  9. import glob,string,sys
  10. from multiprocessing import Process,Queue
  11. import oss2
  12. from kafka import KafkaProducer, KafkaConsumer
  13. ##for CeKanYuan
  14. #10月21日,通过图像名称判断,是那个平台。方式不好。
  15. #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
  16. #3月18日,采用OSS阿里云存储桶
  17. #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
  18. platform_query_url='SendLog/platformQuery.json'
  19. api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
  20. #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
  21. ##这套名字,是联通的。
  22. name_dic={
  23. "排口":"入河、湖排口",
  24. "疑似污口": "入河、湖排口",
  25. "水生植被": "水生植物",
  26. "漂浮物": "有大面积漂物",
  27. "结束": "结束",
  28. '其它' :'其它'
  29. }
  30. ## for TH river
  31. ##这套代码是河长制度的。
  32. nameID_dic={
  33. "排口":'00000',
  34. "疑似污口": '8378',
  35. "水生植被": '8380',
  36. "漂浮物": '8368',
  37. "结束":'9999',
  38. '其它':'8888'
  39. }
  40. def get_time(filename):
  41. #2021-10-09-11-44-51_frame-598-720_type-水生植被.jpg
  42. sps=filename.strip().split('_')[0]
  43. tsps=sps.split('-')
  44. return '%s-%s-%s %s:%s:%s'%(tsps[0],tsps[1],tsps[2],tsps[3],tsps[4],tsps[5])
  45. def get_ms(time0,time1):
  46. str_time ='%.2f ms'%((time1-time0)*1000)
  47. return str_time
  48. def get_urls( platform_query_url,fp_log ):
  49. try:
  50. if os.path.exists(platform_query_url):
  51. #print('###line49')
  52. with open('SendLog/platformQuery.json','r') as fp:
  53. res = json.load(fp)
  54. else:
  55. res = requests.get(platform_query_url,timeout=10).json()
  56. #print('###line54')
  57. questionUrl = res['data']['questionUrl'] ###直播流时,问题图片的推送地址
  58. offlineUrl = res['data']['offlineUrl'] ###http离线视频时,问题图片的推送地址
  59. except Exception as ee:
  60. timestr=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  61. print('###### %s: file:send_transfer: error %s ,url:%s #####'%(timestr,ee,platform_query_url))
  62. outstr = '\n %s ###### get url platform error : update error:%s , url:%s'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,ee,platform_query_url)
  63. fp_log.write(outstr);fp_log.flush()
  64. questionUrl="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
  65. offlineUrl ="http://47.96.182.154:9040/api/taskFile/submitUAVKHQuestion"
  66. return questionUrl,offlineUrl
  67. def parse_filename(filename_base):
  68. #etc:2022-01-13-16-04-17_frame-823-1440_type-水生植被_hgYFEulc0dPIrG1S_s-off-XJRW20220113154959_AI.jpg
  69. uid =filename_base.split('.')[0].split('_')[3].strip()
  70. sourceType=filename_base.split('_')[4].split('-')[1]
  71. sourceId=filename_base.split('_')[4].split('-')[2]
  72. typename=filename_base.split('.')[0].split('_')[2].split('-')[1].strip()
  73. return uid,sourceType,sourceId,typename
  74. def b64encode_function(filename, filename_OR):
  75. if os.path.exists(filename):
  76. image_ori=cv2.imread(filename)
  77. image_ori_OR=cv2.imread(filename_OR)
  78. else:
  79. image_ori = filename.copy()
  80. image_ori_OR = image_ori_OR.copy()
  81. image_pngcode = cv2.imencode('.jpg',image_ori)[-1]
  82. image_pngcode_OR = cv2.imencode('.jpg',image_ori_OR)[-1]
  83. image_code = str(base64.b64encode(image_pngcode))[2:-1]
  84. image_code_OR = str(base64.b64encode(image_pngcode_OR))[2:-1]
  85. return image_code, image_code_OR
  86. def JsonSend(parIn):
  87. fp_log = parIn['fp_log']
  88. try:
  89. response=requests.post(parIn['api'],json=parIn['input_'],timeout=10).json()
  90. t3 = time.time()
  91. print('\n file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s, size:%.2f M \n'%(parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],api,response['code'],parIn['sizeImage']))
  92. outstr = '%s file:%s encodetime:%.5f request time:%.5f,send to %s ,return code:%s,size:%.2f M ,%s\n'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),parIn['filename_base'],parIn['t2']-parIn['t1'],t3-parIn['t2'],parIn['api'],response['code'],parIn['sizeImage'],parIn['dic_str'])
  93. fp_log.write(outstr);fp_log.flush()
  94. except Exception as ee:
  95. print('\n ######file:%s: upload error:%s,size:%.2f M'%(parIn['filename_base'],ee, parIn['sizeImage']))
  96. outstr = '\n%s ###### file:%s: upload error:%s , size:%.2f M'%( time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) ,parIn['filename_base'],ee,parIn['sizeImage'])
  97. fp_log.write(outstr);fp_log.flush()
  98. def dic2str(dic):
  99. st=''
  100. for key in dic.keys():
  101. st='%s %s:%s,'%(st,key,dic[key])
  102. return st
  103. def createJsonInput(filename,offlineUrl,questionUrl):
  104. flag = True
  105. filename_base = os.path.basename(filename)
  106. filename_OR=filename.replace('_AI.','_OR.')
  107. if not os.path.exists(filename_OR ):
  108. return False
  109. uid,sourceType, sourceId,typename = parse_filename(filename_base)
  110. if (typename not in name_dic.keys()) or (typename == '排口'):
  111. return False
  112. api = questionUrl if sourceType=='live' else offlineUrl
  113. time_str = get_time(filename_base)
  114. input_ ={
  115. 'imgName':os.path.basename(filename),
  116. 'imgNameOriginal':os.path.basename(filename_OR),
  117. 'time':time_str,
  118. 'fid':uid, ###随机16位字符
  119. 'type':name_dic[typename],###这次先采用 ["排口","污口","水生植被","漂浮物","其它"]
  120. 'typeId':nameID_dic[typename]
  121. }
  122. if sourceType!='live':
  123. input_['code']=sourceId;###只有离线视频才需要code,
  124. dic_str = dic2str(input_)
  125. t1 = time.time()
  126. image_code, image_code_OR = b64encode_function(filename, filename_OR)
  127. input_['imgData']=image_code
  128. input_['imgDataOriginal']=image_code_OR
  129. sizeImage = (len(image_code) + len(image_code_OR) )/1000000.0
  130. parOut={};parOut['flag']=True;parOut['input_']=input_;
  131. parOut['sizeImage']=sizeImage;parOut['dic_str']=dic_str;
  132. parOut['filename']=filename;parOut['filename_OR']=filename_OR;
  133. parOut['api']=api ; parOut['t1']=t1 ; parOut['filename_base']= filename_base
  134. return parOut
  135. def getLogFileFp(streamName):
  136. logname ='SendLog/'+ time.strftime("%Y-%m-%d", time.localtime())+'_%s.txt'%(streamName)
  137. if os.path.exists(logname):
  138. fp_log = open(logname,'a+')
  139. else:
  140. fp_log = open(logname,'w')
  141. return
  142. def lodaMsgInfos(jsonDir,msgId):
  143. jsonUrl = os.path.join(jsonDir,msgId+'.json')
  144. with open(jsonUrl,'r') as fp:
  145. data=json.load(fp)
  146. return data
  147. def parse_filename_for_oss(name):
  148. splts=name.split('_')
  149. typename=splts[2].split('-')[1].strip()
  150. msgId=splts[4].split('-')[3]
  151. onLineType=splts[4].split('-')[1]
  152. return typename,msgId,onLineType
  153. msg_dict_off={
  154. "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  155. "biz_id":"hehuzhang",#业务标识
  156. "mod_id":"ai",#模型标识
  157. "status":"running",#任务状态
  158. "type":str(1),#数据类型:1图片 2视频
  159. "error":9999,#错误信息
  160. "results":[#问题结果
  161. {
  162. "original_url":"",#原图地址
  163. "sign_url":"",#AI标记地址
  164. "category_id":"",#分类标识
  165. "description":"",#问题描述
  166. "time":"",#时间戳
  167. }
  168. ]
  169. }
  170. msg_dict_on={
  171. "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  172. "biz_id":"hehuzhang",#业务标识
  173. "mod_id":"qi",#模型标识
  174. "status":"running",#任务状态
  175. "type":str(2),#数据类型:1图片 2视频
  176. "error":9999,#错误信息
  177. "results":[#问题结果
  178. {
  179. "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
  180. "sign_url":"",#识别后视频地址
  181. }
  182. ]
  183. }
  184. def update_json(jsonOri,jsonNew,offkeys=["msg_id","biz_id" ,"mod_id" ]):
  185. #{'biz_id': 'hehuzhang', 'mod_id': 'ai', 'msg_id': 'bblvgyntTsZCamqjuLArkiSYIbKXEeWx', 'offering_id': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'offering_type': 'mp4', 'results_base_dir': 'XJRW20220317153547', 'inSource': 'http://vod.play.t-aaron.com/customerTrans/c49a2c620795d124f2ae4b10197b8d0e/303b7a58-17f3ef4494e-0004-f90c-f2c-7ec68.mp4', 'outSource': 'NO'}
  186. for key in offkeys:
  187. jsonNew[key] = jsonOri[key]
  188. return jsonNew
  189. def test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar):
  190. time0_0 = time.time()
  191. logname ='SendLog/'+ time.strftime("%Y-%m-%d.txt", time.localtime())
  192. if os.path.exists(logname):
  193. fp_log = open(logname,'a+')
  194. else:
  195. fp_log = open(logname,'w')
  196. ifind=0
  197. time0_0 = time.time()
  198. producer = KafkaProducer(
  199. bootstrap_servers=kafkaPar['boostServer'],#tencent yun
  200. value_serializer=lambda v: v.encode('utf-8'))
  201. ###登陆准备存储桶
  202. auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
  203. # Endpoint以杭州为例,其它Region请按实际情况填写。
  204. bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
  205. while True:
  206. #filelist = os.listdir(indir)
  207. filelist_AI = sorted(glob.glob('%s/*_AI.*'%(indir)),key=os.path.getmtime)
  208. filelist = filelist_AI
  209. if len(filelist)!=0:
  210. time0 = time.time()
  211. for filename in filelist[0:2]:
  212. filename_base = os.path.basename(filename)
  213. ##解析文件名
  214. typename,msgId,onLineType = parse_filename_for_oss(filename_base)
  215. ##存储文件
  216. filename_OR=filename.replace('_AI.','_OR.')
  217. if typename!='结束':
  218. ObjectName_AI=os.path.join(ossPar['bucketName'],os.path.basename(filename))
  219. ObjectName_OR=os.path.join(ossPar['bucketName'],os.path.basename(filename_OR))
  220. bucket.put_object_from_file(ObjectName_AI, filename)
  221. bucket.put_object_from_file(ObjectName_OR, filename_OR)
  222. taskInfos = lodaMsgInfos(jsonDir,msgId)
  223. #print(taskInfos)
  224. ##发送返回信息
  225. #if onLineType=='off':
  226. msg = msg_dict_off
  227. msg['results'][0]['original_url']= ObjectName_OR
  228. msg['results'][0]['sign_url']= ObjectName_AI
  229. msg['results'][0]['category_id']= nameID_dic[typename]
  230. msg['results'][0]['description']= typename
  231. msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  232. msg = update_json(taskInfos,msg)
  233. else:
  234. msg = msg_dict_on
  235. videoList = sorted(glob.glob('%s/*'%(videoBakDir)),key=os.path.getmtime)
  236. videoName = os.path.basename(videoList[0])
  237. msg["status"]="success";msg["msg_id"]=msgId
  238. ObjectName_AI=os.path.join(ossPar['bucketName'],videoName)
  239. bucket.put_object_from_file(ObjectName_AI, videoList[0])
  240. msg['results'][0]['original_url']= ObjectName_AI
  241. msg['results'][0]['sign_url']= ObjectName_AI###最新的视频文件
  242. print('###'*3,'Send:',filename)
  243. msg = json.dumps(msg, ensure_ascii=False)
  244. future = producer.send(
  245. kafkaPar['topic'],
  246. msg
  247. )
  248. print('***'*20,' Send transfer ',onLineType,msg)
  249. ##上传后的图片,移走到另外一个文件夹###
  250. cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
  251. cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
  252. else:
  253. time.sleep(1)
  254. fp_log.close()
  255. if __name__=='__main__':
  256. indir='problems/images_tmp'
  257. outdir='problems/images_save'
  258. jsonDir = 'mintors/kafka/'
  259. videoBakDir='../../data/video_live_bak/1945'
  260. ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
  261. 'AId':'LTAI5tSJ62TLMUb4SZuf285A',
  262. 'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
  263. 'bucketName':'ta-tech-image',
  264. }
  265. #kafkaPar={'boostServer':['212.129.223.66:9092'],'topic':'testReturn'}
  266. kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
  267. test5(indir,outdir,jsonDir,videoBakDir,ossPar,kafkaPar)