|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- from PIL import Image
- import numpy as np
- import cv2
- import base64
- import io,os
- import requests
- import time,json
- import string,random
- import glob,string,sys
- from multiprocessing import Process,Queue
- import oss2,copy
- from kafka import KafkaProducer, KafkaConsumer
- from utilsK.sendUtils import *
-
- from utilsK.masterUtils import create_logFile,wrtiteLog,writeELK_log,send_kafka
- from voduploadsdk.UploadVideoRequest import UploadVideoRequest
- from voduploadsdk.AliyunVodUtils import *
- from voduploadsdk.AliyunVodUploader import AliyunVodUploader
- import hashlib
- from kafka.errors import kafka_errors
- ##for CeKanYuan
- #10月21日,通过图像名称判断,是那个平台。方式不好。
- #10月22日,改成访问固定的地址,从地址中读取,平台的名称与地址。每隔2分钟访问一次。
- #3月18日,采用OSS阿里云存储桶
- #platform_query_url='http://47.96.182.154:9051/api/suanfa/getPlatformInfo'
- platform_query_url='SendLog/platformQuery.json'
- api = 'http://121.40.249.52:9050/api/taskFile/submitUAVKHQuestion'
- #api = 'http://47.98.157.120:9040/api/taskFile/submitUAVKHQuestion'
-
- ##这套名字,是联通的。
- name_dic={
- "排口":"入河、湖排口",
- "疑似污口": "入河、湖排口",
- "水生植被": "水生植物",
- "漂浮物": "有大面积漂物",
- "结束": "结束",
- '其它' :'其它'
- }
- ## for TH river
- ##这套代码是河长制度的。
- nameID_dic={
- "排口":'00000',
- "疑似污口": '8378',
- "水生植被": '8380',
- "漂浮物": '8368',
- "结束":'9999',
- "其它":'8888'
- }
-
-
- msg_dict_off={
- "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "biz_id":"hehuzhang",#业务标识
- "mod_id":"ai",#模型标识
- "status":"running",#任务状态
- "type":str(1),#数据类型:1图片 2视频
- "error":str(9999),#错误信息
- "progressbar":"None",
- "results":[#问题结果
- {
- "original_url":"",#原图地址
- "sign_url":"",#AI标记地址
- "category_id":"",#分类标识
- "description":"",#问题描述
- "time":"",#时间戳
- }
- ]
- }
-
- msg_dict_on={
- "msg_id":"bblvgyntTsZCamqjuLArkiSYIbKXEeWx",#消息ID标识
- "biz_id":"hehuzhang",#业务标识
- "mod_id":"ai",#模型标识
- "status":"running",#任务状态
- "type":str(2),#数据类型:1图片 2视频
- "error":str(9999),#错误信息
- "progressbar":"None",
- "results":[#问题结果
- {
- "original_url":"",#原视频地址(离线识别时为空不传,实时识别时需要上传)
- "sign_url":"",#识别后视频地址
- }
- ]
- }
-
-
-
- def mintor_offline_ending(parIn):
-
- indir,server,topic,fp_log = parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']
- par_kafka={};par_kafka['server']=server;par_kafka['topic']=topic;
- logger = parIn['logger'];thread='Send-tranfer-oss:mintor-offline-ending'
- time_interval = parIn['timeInterval']
- ###轮询image_tmp的文件夹,每10s一次,一旦产生离线结束标志,则不停地发送没30秒heartbeat信号。
- producer = KafkaProducer(
- bootstrap_servers=par_kafka['server'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8'),
- metadata_max_age_ms=120000,
- )
- outStrList={}
- writeELK_log(msg='child processs starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_ss0=time.time()
- while True:
-
- filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
- filelist = filelist_AI
-
- off_msgs=[]
-
- for filename in filelist[0:]:
- filename_base = os.path.basename(filename)
- ##解析文件名
- typename,msgId,onLineType = parse_filename_for_oss(filename_base)
- if (onLineType=='off') and (typename=='结束'):
- off_msgs.append(msgId)
-
- for msgId in off_msgs:
- msg_heart = copy.deepcopy(msg_dict_off)
- msg_heart['status']='running'
- msg_heart["msg_id"]=msgId
- msg_heart = json.dumps(msg_heart, ensure_ascii=False)
-
- outStrList['success']= '----- send heartBeat in Transfer success, msg:%s '%(msgId)
- outStrList['failure']='----- kafka error when sending heartBeat in Transfer '
- outStrList['Refailure']='----- kafka error when Re-sending heartBeat in Transfer '
-
-
- send_kafka(producer,par_kafka,msg_heart,outStrList,fp_log,line=sys._getframe().f_lineno,logger=logger,thread=thread );
-
- time.sleep(time_interval)
- time_ss1=time.time()
- if time_ss1 - time_ss0>120:
- outstrs = 'child process sleeping:%f s '%(time_ss1-time_ss0)
- writeELK_log(msg=outstrs,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_ss0=time_ss1
-
-
-
-
-
-
- def test5(par):
-
- indir,outdir,logdir,jsonDir = par['indir'],par['outdir'],par['logdir'],par['jsonDir']
- hearBeatTimeMs = par['hearBeatTimeMs']
- videoBakDir,ossPar,vodPar,kafkaPar = par['videoBakDir'], par['ossPar'],par['vodPar'],par['kafkaPar']
-
-
-
- time0_0 = time.time();logname='SendPics.log';thread='Send-tranfer-oss:main'
- fp_log=create_logFile(logdir=logdir,name=logname)
-
- logger=logdir.replace('/','.')+'.'+logname
- writeELK_log(msg='Send_tranfer_oss process starts',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
-
- parIn={};####给心跳信号发射的子进程参数
- parIn['indir'],parIn['server'],parIn['topic'] ,parIn['fp_log']=indir,kafkaPar['boostServer'],kafkaPar['topic'],fp_log
- parIn['timeInterval'] = hearBeatTimeMs;parIn['logger']=logger
- HeartProcess=Process(target=mintor_offline_ending,name='process-sendHeartOnly',args=(parIn,))
- HeartProcess.start()
-
-
- ifind=0
- time0_0 = time.time()
-
- producer = KafkaProducer(
- bootstrap_servers=kafkaPar['boostServer'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8'),
- metadata_max_age_ms=120000,
- )
-
-
-
-
- ###登陆准备存储桶
- auth = oss2.Auth(ossPar['AId'], ossPar['ASt'])
- # Endpoint以杭州为例,其它Region请按实际情况填写。
- bucket = oss2.Bucket(auth, ossPar['Epoint'], ossPar['bucketName'])
-
- ##VOD
- clt = init_vod_client(vodPar['AId'], vodPar['ASt'])
- uploader = AliyunVodUploader(vodPar['AId'], vodPar['ASt'])
-
-
- writeELK_log(msg='Load Parameter over',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- par_heart={};outStrList={}
- time_b0=time.time()
- while True:
-
- filelist_AI = sorted(glob.glob('%s/*_AI.txt'%(indir)),key=os.path.getmtime)
- filelist=[]
- for filename in filelist_AI:
- filename_base = os.path.basename(filename)
- typename,msgId,onLineType = parse_filename_for_oss(filename_base)
- if typename in ["排口","其它"]:
- continue
- filelist.append(filename)
-
- if len(filelist)!=0:
- time0 = time.time()
- for filename in filelist[0:]:
- filename_base = os.path.basename(filename)
- ##解析文件名
- typename,msgId,onLineType = parse_filename_for_oss(filename_base)
-
- ##存储文件
- filename_OR=filename.replace('_AI.','_OR.')
-
- filename_AI_image = filename.replace('.txt','.jpg')
- filename_OR_image = filename_OR.replace('.txt','.jpg')
-
- taskInfos = lodaMsgInfos(jsonDir,msgId)
- oss_dir = taskInfos['results_base_dir']
-
- if typename in ["排口","其它"]:
- continue
- if typename not in ['结束','超时结束']:
-
- time_s1 = time.time()
- ObjectName_AI=os.path.join(oss_dir,os.path.basename(filename_AI_image))
- ObjectName_OR=os.path.join(oss_dir,os.path.basename(filename_OR_image))
- bucket.put_object_from_file(ObjectName_AI, filename_AI_image)
- ret2=bucket.put_object_from_file(ObjectName_OR, filename_OR_image)
-
- outstr=' oss bucket upload %s %s %s '%('***'*3,'Send:',filename)
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
-
- msg = copy.deepcopy(msg_dict_off)
- if onLineType!='off': msg['type']=str(2)
- else: msg['type']=str(1)
- msg['results'][0]['original_url']= ObjectName_OR
- msg['results'][0]['sign_url']= ObjectName_AI
- msg['results'][0]['category_id']= nameID_dic[typename]
- msg['results'][0]['description']= typename
- msg['results'][0]['time']= time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- msg = update_json(taskInfos,msg)
- time_s2 = time.time()
- else:
- time_s1 = time.time()
- if onLineType!='off':
- msg = copy.deepcopy(msg_dict_on)
- msg["msg_id"]=msgId ;msg['type']=str(2)
- msg['results'][0]['original_url']= "yourAddess"
- msg['results'][0]['sign_url']= "yourAddess"###最新的视频文件
- upCnt=1;upLoaded=False
- while upCnt<4:
- try:
- videoUrl= get_videoUurl(videoBakDir,msgId+'_AI.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- VideoId_AI=str(videoId['VideoId'])
-
- videoUrl= get_videoUurl(videoBakDir,msgId+'_OR.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- VideoId_OR=str(videoId['VideoId'])
-
- outstr=VideoId_OR+','+VideoId_AI
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- msg['results'][0]['sign_url']=VideoId_AI
- msg['results'][0]['original_url']=VideoId_OR
- upCnt=4;upLoaded=True
- except Exception as e:
- writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
- upCnt+=1;upLoaded=False
- if not upLoaded:
- msg['error']='video uploading failure'
-
-
- else:
- msg = copy.deepcopy(msg_dict_off)
- msg['type']=str(1)
- msg["msg_id"]=msgId
- msg['results'][0]['original_url']= taskInfos['offering_id']
- videoUrl= get_videoUurl(videoBakDir,msgId+'.MP4')
- assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl)
- upCnt=1;upLoaded=False
- while upCnt<4:
- try:
- uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo')
- videoId = uploader.uploadLocalVideo(uploadVideoRequest)
- outstr=' oss upload video over %s '%(str(videoId['VideoId']))
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- msg['results'][0]['sign_url']= str(videoId['VideoId'])###最新的视频文件
- upCnt=4;upLoaded=True
- except Exception as e:
- writeELK_log(msg='video uploading error:%s, times:%d'%(e,upCnt),fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger);
- upCnt+=1;upLoaded=False
- if not upLoaded:
- msg['error']='video uploading failure'
- if upLoaded:
- if typename=='结束': msg["status"]="success"
- else: msg["status"]="success_timeout"
- else:
- msg["status"]='failed'
- time_s2 = time.time()
-
- msg = json.dumps(msg, ensure_ascii=False)
- future = producer.send(
- kafkaPar['topic'],
- msg
- )
-
-
- try:
- record_metadata = future.get()
- outstr='kafka send:%s msg:%s producer status:%s'%(onLineType,msg,producer.bootstrap_connected())
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- except Exception as e:
- outstr='kafka ERROR:%s'%(str(e))
- #outstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='WARNING',line=sys._getframe().f_lineno,logger=logger)
- producer.close()
-
- producer = KafkaProducer(
- bootstrap_servers=kafkaPar['boostServer'],#tencent yun
- value_serializer=lambda v: v.encode('utf-8')
- )
- try:
- future = producer.send(kafkaPar['topic'], msg).get()
- except Exception as e:
- outstr='kafka resend ERROR:%s'%(str(e))
- #poutstr=wrtiteLog(fp_log,outstr);print( outstr);
- writeELK_log(msg=outstr,fp=fp_log,thread=thread,level='ERROR',line=sys._getframe().f_lineno,logger=logger)
-
-
- time_s3 = time.time()
-
- ##上传后的图片,移走到另外一个文件夹###
- cmd = 'mv \'%s\' \'%s\' '%(filename,outdir); os.system(cmd)
- cmd = 'mv \'%s\' \'%s\' '%(filename_OR,outdir); os.system(cmd)
- time_s4 = time.time()
- print('-'*50)
-
- else:
- time.sleep(1)
- time_b1=time.time()
- if time_b1-time_b0>120:
- writeELK_log(msg='send main process sleeping',fp=fp_log,thread=thread,level='INFO',line=sys._getframe().f_lineno,logger=logger)
- time_b0=time_b1
- fp_log.close()
-
- if __name__=='__main__':
- '''
- indir='problems/images_tmp'
- outdir='problems/images_save'
- logdir='logs/send'
- jsonDir = 'mintors/kafka/'
- videoBakDir='problems/videos_save'
- ossPar={'Epoint':'http://oss-cn-shanghai.aliyuncs.com',
- 'AId':'LTAI5tSJ62TLMUb4SZuf285A',
- 'ASt':'MWYynm30filZ7x0HqSHlU3pdLVNeI7',
- 'bucketName':'ta-tech-image',
- }
- vodPar={
- 'AId':'LTAI5tE7KWN9fsuGU7DyfYF4',
- 'ASt':'yPPCyfsqWgrTuoz5H4sisY0COclx8E',
-
- }
- kafkaPar={'boostServer':['101.132.127.1:19092'],'topic':'alg-task-results'}
- '''
- masterFile="conf/send_oss.json"
- assert os.path.exists(masterFile)
- with open(masterFile,'r') as fp:
- par=json.load(fp)
-
- test5(par)
|