You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

DSP_Send_tranfer_oss.py 16KB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from PIL import Image
  2. import numpy as np
  3. import cv2
  4. import base64
  5. import io,os
  6. import requests
  7. import time,json
  8. import string,random
  9. import glob,string,sys
  10. from multiprocessing import Process,Queue
  11. import oss2,copy
  12. from kafka import KafkaProducer, KafkaConsumer
  13. from utilsK.sendUtils import *
  14. from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
  15. from voduploadsdk.UploadVideoRequest import UploadVideoRequest
  16. from voduploadsdk.AliyunVodUtils import *
  17. from voduploadsdk.AliyunVodUploader import AliyunVodUploader
  18. import hashlib
  19. from kafka.errors import kafka_errors
  20. ##for CeKanYuan
  21. #10月21日,通过图像名称判断,是那个平台。方式不好。
  22. #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
  23. #3月18日,采用OSS阿里云存储桶
  24. #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
  25. platform_query_url='SendLog/platformQuery.json'
  26. api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
  27. #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
  28. ##这套名字,是联通的。
  29. name_dic={
  30. "排口":"入河、湖排口",
  31. "排污口": "入河、湖排口",
  32. "水生植被": "水生植物",
  33. "漂浮物": "有大面积漂物",
  34. "结束": "结束",
  35. '其它' :'其它'
  36. }
  37. ## for TH river
  38. ##这套代码是河长制度的。
  39. '''
  40. nameID_dic={
  41. "排口":'00000',
  42. "排污口": '8378',
  43. "水生植被": '8380',
  44. "漂浮物": '8368',
  45. "结束":'9999',
  46. "其它":'8888'
  47. }
  48. '''
  49. msg_dict_off={
  50. "request_id":"fflvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  51. "status":"running",#任务状态
  52. "type":str(2),#消息类型 1:实时 2:离线
  53. #"error":str(9999),#错误信息####
  54. "error_code":"",#//错误编号
  55. "error_msg":"",#//错误描述
  56. "progress":"",
  57. "results":[#问题结果
  58. {
  59. "original_url":"",#原图地址
  60. "sign_url":"",#AI标记地址
  61. "category_id":"",#分类标识
  62. "description":"",#问题描述
  63. "analyse_time":"",#时间戳
  64. }
  65. ]
  66. }
  67. msg_dict_on={
  68. "request_id":"nnlvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  69. "status":"running",#任务状态
  70. "type":str(1),#消息类型 1:实时 2:离线
  71. "error_code":"",#//错误编号
  72. "error_msg":"",#//错误描述
  73. "progressOn":"",
  74. "results":[#问题结果
  75. {
  76. "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
  77. "sign_url":"",#识别后视频地址
  78. }
  79. ]
  80. }
  81. def mintor_offline_ending(parIn):
  82. indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
  83. par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
  84. logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
  85. time_interval = parIn['timeInterval']
  86. ###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。
  87. producer = KafkaProducer(
  88. bootstrap_servers=par_kafka['server'],#tencent yun
  89. value_serializer=lambda v: v.encode('utf-8'),
  90. metadata_max_age_ms=120000,
  91. )
  92. outStrList={}
  93. writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  94. time_ss0=time.time()
  95. while True:
  96. filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
  97. filelist = filelist_AI
  98. off_msgs=[]
  99. for filename in filelist[0:]:
  100. filename_base = os.path.basename(filename)
  101. ##解析文件名
  102. typename,requestId,onLineType = parse_filename_for_oss(filename_base)
  103. if (onLineType=='off') and (typename=='结束'):
  104. off_msgs.append(requestId)
  105. for requestId in off_msgs:
  106. msg_heart = copy.deepcopy(msg_dict_off)
  107. msg_heart['status']='running'
  108. msg_heart["request_id"]=requestId
  109. msg_heart = json.dumps(msg_heart, ensure_ascii=False)
  110. outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(requestId)
  111. outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
  112. outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
  113. send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  114. time.sleep(time_interval)
  115. time_ss1=time.time()
  116. if time_ss1 - time_ss0>120:
  117. outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
  118. writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  119. time_ss0=time_ss1
  120. def test5(par):
  121. indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
  122. hearBeatTimeMs = par['hearBeatTimeMs']
  123. videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
  124. time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
  125. fp_log=create_logFile(logdir=logdir,name=logname)
  126. logger=logdir.replace('/','.')+'.'+logname
  127. writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  128. nameID_dic=getNamedic(par['labelnamesFile'].strip())
  129. parIn={};####给心跳信号发射的子进程参数
  130. parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
  131. parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
  132. HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
  133. HeartProcess.start()
  134. ifind=0
  135. time0_0 = time.time()
  136. producer = KafkaProducer(
  137. bootstrap_servers=kafkaPar['boostServer'],#tencent yun
  138. value_serializer=lambda v: v.encode('utf-8'),
  139. metadata_max_age_ms=120000,
  140. )
  141. ###登陆准备存储桶
  142. auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
  143. # Endpoint以杭州为例,其它Region请按实际情况填写。
  144. bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
  145. ##VOD
  146. clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
  147. uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
  148. writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  149. par_heart={};outStrList={}
  150. time_b0=time.time()
  151. while True:
  152. filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
  153. filelist=[]
  154. for filename in filelist_AI:
  155. filename_base = os.path.basename(filename)
  156. typename,requestId,onLineType = parse_filename_for_oss(filename_base)
  157. if typename in ["其它"]:
  158. continue
  159. filelist.append(filename)
  160. if len(filelist)!=0:
  161. time0 = time.time()
  162. for filename in filelist[0:]:
  163. filename_base = os.path.basename(filename)
  164. ##解析文件名
  165. typename,requestId,onLineType = parse_filename_for_oss(filename_base)
  166. ##存储文件
  167. filename_OR=filename.replace('_AI.','_OR.')
  168. filename_AI_image = filename.replace('.txt','.jpg')
  169. filename_OR_image = filename_OR.replace('.txt','.jpg')
  170. taskInfos = lodaMsgInfos(jsonDir,requestId)
  171. oss_dir = taskInfos['results_base_dir']
  172. #if typename in ["排口","其它"]:
  173. # continue
  174. if typename not in ['结束','超时结束']:
  175. time_s1 = time.time()
  176. ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
  177. ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
  178. bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
  179. ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
  180. outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
  181. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  182. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  183. msg = copy.deepcopy(msg_dict_off)
  184. if onLineType!='off': msg['type']=str(1)
  185. else: msg['type']=str(2)
  186. msg['results'][0]['original_url']= ObjectName_OR
  187. msg['results'][0]['sign_url']= ObjectName_AI
  188. msg['results'][0]['category_id']= nameID_dic[typename]
  189. msg['results'][0]['description']= typename
  190. msg['results'][0]['analyse_time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  191. msg = update_json(taskInfos,msg)
  192. time_s2 = time.time()
  193. else:
  194. time_s1 = time.time()
  195. if onLineType!='off':
  196. msg = copy.deepcopy(msg_dict_on)
  197. msg["request_id"]=requestId ;msg['type']=str(1)
  198. msg['results'][0]['original_url']= "yourAddess"
  199. msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
  200. upCnt=1;upLoaded=False
  201. while upCnt<4:
  202. try:
  203. videoUrl= get_videoUurl(videoBakDir,requestId+'_AI.MP4')
  204. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  205. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  206. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  207. VideoId_AI=str(videoId['VideoId'])
  208. videoUrl= get_videoUurl(videoBakDir,requestId+'_OR.MP4')
  209. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  210. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  211. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  212. VideoId_OR=str(videoId['VideoId'])
  213. outstr=VideoId_OR+','+VideoId_AI
  214. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  215. msg['results'][0]['sign_url']=VideoId_AI
  216. msg['results'][0]['original_url']=VideoId_OR
  217. upCnt=4;upLoaded=True
  218. except Exception as e:
  219. writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
  220. upCnt+=1;upLoaded=False
  221. if not upLoaded:
  222. msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
  223. else:
  224. msg = copy.deepcopy(msg_dict_off)
  225. msg['type']=str(2)
  226. msg["request_id"]=requestId
  227. msg['results'][0]['original_url']= taskInfos['original_url']
  228. videoUrl= get_videoUurl(videoBakDir,requestId+'.MP4')
  229. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  230. upCnt=1;upLoaded=False
  231. while upCnt<4:
  232. try:
  233. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  234. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  235. outstr=' oss upload video over %s '%(str(videoId['VideoId']))
  236. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  237. msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
  238. upCnt=4;upLoaded=True
  239. except Exception as e:
  240. writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
  241. upCnt+=1;upLoaded=False
  242. if not upLoaded:
  243. msg['error_msg']='video uploading failure' ; msg['error_code']='101' ;
  244. if upLoaded:
  245. if typename=='结束': msg["status"]="success"
  246. else: msg["status"]="timeout"
  247. else:
  248. msg["status"]='failed'
  249. time_s2 = time.time()
  250. msg = json.dumps(msg, ensure_ascii=False)
  251. future = producer.send(
  252. kafkaPar['topic'],
  253. msg
  254. )
  255. try:
  256. record_metadata = future.get()
  257. outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
  258. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  259. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  260. except Exception as e:
  261. outstr='kafka ERROR:%s'%(str(e))
  262. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  263. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
  264. producer.close()
  265. producer = KafkaProducer(
  266. bootstrap_servers=kafkaPar['boostServer'],#tencent yun
  267. value_serializer=lambda v: v.encode('utf-8')
  268. )
  269. try:
  270. future = producer.send(kafkaPar['topic'], msg).get()
  271. except Exception as e:
  272. outstr='kafka resend ERROR:%s'%(str(e))
  273. #poutstr=wrtiteLog(fp_log,outstr);print( outstr);
  274. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
  275. time_s3 = time.time()
  276. ##上传后的图片,移走到另外一个文件夹###
  277. cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
  278. cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
  279. time_s4 = time.time()
  280. print('-'*50)
  281. else:
  282. time.sleep(1)
  283. time_b1=time.time()
  284. if time_b1-time_b0>120:
  285. writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  286. time_b0=time_b1
  287. fp_log.close()
  288. if __name__=='__main__':
  289. masterFile="conf/send_oss.json"
  290. assert os.path.exists(masterFile)
  291. with open(masterFile,'r') as fp:
  292. par=json.load(fp)
  293. test5(par)