用kafka接收消息
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
17KB

  1. from PIL import Image
  2. import numpy as np
  3. import cv2
  4. import base64
  5. import io,os
  6. import requests
  7. import time,json
  8. import string,random
  9. import glob,string,sys
  10. from multiprocessing import Process,Queue
  11. import oss2,copy
  12. from kafka import KafkaProducer, KafkaConsumer
  13. from utilsK.sendUtils import *
  14. from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
  15. from voduploadsdk.UploadVideoRequest import UploadVideoRequest
  16. from voduploadsdk.AliyunVodUtils import *
  17. from voduploadsdk.AliyunVodUploader import AliyunVodUploader
  18. import hashlib
  19. from kafka.errors import kafka_errors
  20. ##for CeKanYuan
  21. #10月21日,通过图像名称判断,是那个平台。方式不好。
  22. #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
  23. #3月18日,采用OSS阿里云存储桶
  24. #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
  25. platform_query_url='SendLog/platformQuery.json'
  26. api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
  27. #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
  28. ##这套名字,是联通的。
  29. name_dic={
  30. "排口":"入河、湖排口",
  31. "疑似污口": "入河、湖排口",
  32. "水生植被": "水生植物",
  33. "漂浮物": "有大面积漂物",
  34. "结束": "结束",
  35. '其它' :'其它'
  36. }
  37. ## for TH river
  38. ##这套代码是河长制度的。
  39. nameID_dic={
  40. "排口":'00000',
  41. "疑似污口": '8378',
  42. "水生植被": '8380',
  43. "漂浮物": '8368',
  44. "结束":'9999',
  45. "其它":'8888'
  46. }
  47. msg_dict_off={
  48. "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  49. "biz_id":"hehuzhang",#业务标识
  50. "mod_id":"ai",#模型标识
  51. "status":"running",#任务状态
  52. "type":str(1),#数据类型:1图片 2视频
  53. "error":str(9999),#错误信息
  54. "progressbar":"None",
  55. "results":[#问题结果
  56. {
  57. "original_url":"",#原图地址
  58. "sign_url":"",#AI标记地址
  59. "category_id":"",#分类标识
  60. "description":"",#问题描述
  61. "time":"",#时间戳
  62. }
  63. ]
  64. }
  65. msg_dict_on={
  66. "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
  67. "biz_id":"hehuzhang",#业务标识
  68. "mod_id":"ai",#模型标识
  69. "status":"running",#任务状态
  70. "type":str(2),#数据类型:1图片 2视频
  71. "error":str(9999),#错误信息
  72. "progressbar":"None",
  73. "results":[#问题结果
  74. {
  75. "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
  76. "sign_url":"",#识别后视频地址
  77. }
  78. ]
  79. }
  80. def mintor_offline_ending(parIn):
  81. indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
  82. par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
  83. logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
  84. time_interval = parIn['timeInterval']
  85. ###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。
  86. producer = KafkaProducer(
  87. bootstrap_servers=par_kafka['server'],#tencent yun
  88. value_serializer=lambda v: v.encode('utf-8'),
  89. metadata_max_age_ms=120000,
  90. )
  91. outStrList={}
  92. writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  93. time_ss0=time.time()
  94. while True:
  95. filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
  96. filelist = filelist_AI
  97. off_msgs=[]
  98. for filename in filelist[0:]:
  99. filename_base = os.path.basename(filename)
  100. ##解析文件名
  101. typename,msgId,onLineType = parse_filename_for_oss(filename_base)
  102. if (onLineType=='off') and (typename=='结束'):
  103. off_msgs.append(msgId)
  104. for msgId in off_msgs:
  105. msg_heart = copy.deepcopy(msg_dict_off)
  106. msg_heart['status']='running'
  107. msg_heart["msg_id"]=msgId
  108. msg_heart = json.dumps(msg_heart, ensure_ascii=False)
  109. outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(msgId)
  110. outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
  111. outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
  112. send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
  113. time.sleep(time_interval)
  114. time_ss1=time.time()
  115. if time_ss1 - time_ss0>120:
  116. outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
  117. writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  118. time_ss0=time_ss1
  119. def test5(par):
  120. indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
  121. hearBeatTimeMs = par['hearBeatTimeMs']
  122. videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
  123. time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
  124. fp_log=create_logFile(logdir=logdir,name=logname)
  125. logger=logdir.replace('/','.')+'.'+logname
  126. writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  127. parIn={};####给心跳信号发射的子进程参数
  128. parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
  129. parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
  130. HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
  131. HeartProcess.start()
  132. ifind=0
  133. time0_0 = time.time()
  134. producer = KafkaProducer(
  135. bootstrap_servers=kafkaPar['boostServer'],#tencent yun
  136. value_serializer=lambda v: v.encode('utf-8'),
  137. metadata_max_age_ms=120000,
  138. )
  139. ###登陆准备存储桶
  140. auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
  141. # Endpoint以杭州为例,其它Region请按实际情况填写。
  142. bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
  143. ##VOD
  144. clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
  145. uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
  146. writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  147. par_heart={};outStrList={}
  148. time_b0=time.time()
  149. while True:
  150. filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
  151. filelist=[]
  152. for filename in filelist_AI:
  153. filename_base = os.path.basename(filename)
  154. typename,msgId,onLineType = parse_filename_for_oss(filename_base)
  155. if typename in ["排口","其它"]:
  156. continue
  157. filelist.append(filename)
  158. if len(filelist)!=0:
  159. time0 = time.time()
  160. for filename in filelist[0:]:
  161. filename_base = os.path.basename(filename)
  162. ##解析文件名
  163. typename,msgId,onLineType = parse_filename_for_oss(filename_base)
  164. ##存储文件
  165. filename_OR=filename.replace('_AI.','_OR.')
  166. filename_AI_image = filename.replace('.txt','.jpg')
  167. filename_OR_image = filename_OR.replace('.txt','.jpg')
  168. taskInfos = lodaMsgInfos(jsonDir,msgId)
  169. oss_dir = taskInfos['results_base_dir']
  170. if typename in ["排口","其它"]:
  171. continue
  172. if typename not in ['结束','超时结束']:
  173. time_s1 = time.time()
  174. ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
  175. ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
  176. bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
  177. ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
  178. outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
  179. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  180. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  181. msg = copy.deepcopy(msg_dict_off)
  182. if onLineType!='off': msg['type']=str(2)
  183. else: msg['type']=str(1)
  184. msg['results'][0]['original_url']= ObjectName_OR
  185. msg['results'][0]['sign_url']= ObjectName_AI
  186. msg['results'][0]['category_id']= nameID_dic[typename]
  187. msg['results'][0]['description']= typename
  188. msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  189. msg = update_json(taskInfos,msg)
  190. time_s2 = time.time()
  191. else:
  192. time_s1 = time.time()
  193. if onLineType!='off':
  194. msg = copy.deepcopy(msg_dict_on)
  195. msg["msg_id"]=msgId ;msg['type']=str(2)
  196. msg['results'][0]['original_url']= "yourAddess"
  197. msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
  198. upCnt=1;upLoaded=False
  199. while upCnt<4:
  200. try:
  201. videoUrl= get_videoUurl(videoBakDir,msgId+'_AI.MP4')
  202. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  203. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  204. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  205. VideoId_AI=str(videoId['VideoId'])
  206. videoUrl= get_videoUurl(videoBakDir,msgId+'_OR.MP4')
  207. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  208. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  209. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  210. VideoId_OR=str(videoId['VideoId'])
  211. outstr=VideoId_OR+','+VideoId_AI
  212. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  213. msg['results'][0]['sign_url']=VideoId_AI
  214. msg['results'][0]['original_url']=VideoId_OR
  215. upCnt=4;upLoaded=True
  216. except Exception as e:
  217. writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
  218. upCnt+=1;upLoaded=False
  219. if not upLoaded:
  220. msg['error']='video uploading failure'
  221. else:
  222. msg = copy.deepcopy(msg_dict_off)
  223. msg['type']=str(1)
  224. msg["msg_id"]=msgId
  225. msg['results'][0]['original_url']= taskInfos['offering_id']
  226. videoUrl= get_videoUurl(videoBakDir,msgId+'.MP4')
  227. assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
  228. upCnt=1;upLoaded=False
  229. while upCnt<4:
  230. try:
  231. uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
  232. videoId = uploader.uploadLocalVideo(uploadVideoRequest)
  233. outstr=' oss upload video over %s '%(str(videoId['VideoId']))
  234. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  235. msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
  236. upCnt=4;upLoaded=True
  237. except Exception as e:
  238. writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
  239. upCnt+=1;upLoaded=False
  240. if not upLoaded:
  241. msg['error']='video uploading failure'
  242. if upLoaded:
  243. if typename=='结束': msg["status"]="success"
  244. else: msg["status"]="success_timeout"
  245. else:
  246. msg["status"]='failed'
  247. time_s2 = time.time()
  248. msg = json.dumps(msg, ensure_ascii=False)
  249. future = producer.send(
  250. kafkaPar['topic'],
  251. msg
  252. )
  253. try:
  254. record_metadata = future.get()
  255. outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
  256. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  257. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  258. except Exception as e:
  259. outstr='kafka ERROR:%s'%(str(e))
  260. #outstr=wrtiteLog(fp_log,outstr);print( outstr);
  261. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
  262. producer.close()
  263. producer = KafkaProducer(
  264. bootstrap_servers=kafkaPar['boostServer'],#tencent yun
  265. value_serializer=lambda v: v.encode('utf-8')
  266. )
  267. try:
  268. future = producer.send(kafkaPar['topic'], msg).get()
  269. except Exception as e:
  270. outstr='kafka resend ERROR:%s'%(str(e))
  271. #poutstr=wrtiteLog(fp_log,outstr);print( outstr);
  272. writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
  273. time_s3 = time.time()
  274. ##上传后的图片,移走到另外一个文件夹###
  275. cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
  276. cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
  277. time_s4 = time.time()
  278. print('-'*50)
  279. else:
  280. time.sleep(1)
  281. time_b1=time.time()
  282. if time_b1-time_b0>120:
  283. writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
  284. time_b0=time_b1
  285. fp_log.close()
  286. if __name__=='__main__':
  287. '''
  288. indir='problems/images_tmp'
  289. outdir='problems/images_save'
  290. logdir='logs/send'
  291. jsonDir = 'mintors/kafka/'
  292. videoBakDir='problems/videos_save'
  293. ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
  294. 'AId':'LTAI5tSJ62TLMUb4SZuf285A',
  295. 'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
  296. 'bucketName':'ta-tech-image',
  297. }
  298. vodPar={
  299. 'AId':'LTAI5tE7KWN9fsuGU7DyfYF4',
  300. 'ASt':'yPPCyfsqWgrTuoz5H4sisY0COclx8E',
  301. }
  302. kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
  303. '''
  304. masterFile="conf/send_oss.json"
  305. assert os.path.exists(masterFile)
  306. with open(masterFile,'r') as fp:
  307. par=json.load(fp)
  308. test5(par)