@@ -106,7 +106,7 @@ def mintor_offline_ending(parIn): | |||
filelist = filelist_AI | |||
off_msgs=[] | |||
for filename in filelist[0:]: | |||
filename_base = os.path.basename(filename) | |||
##解析文件名 | |||
@@ -246,13 +246,14 @@ def test5(par): | |||
upCnt=1;upLoaded=False | |||
while upCnt<4: | |||
try: | |||
videoUrl=os.path.join(videoBakDir,msgId+'_AI.MP4') | |||
videoUrl= get_videoUurl(videoBakDir,msgId+'_AI.MP4') | |||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) | |||
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo') | |||
videoId = uploader.uploadLocalVideo(uploadVideoRequest) | |||
VideoId_AI=str(videoId['VideoId']) | |||
videoUrl=os.path.join(videoBakDir,msgId+'_OR.MP4') | |||
videoUrl= get_videoUurl(videoBakDir,msgId+'_OR.MP4') | |||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) | |||
uploadVideoRequest = UploadVideoRequest(videoUrl, 'offLineVideo') | |||
videoId = uploader.uploadLocalVideo(uploadVideoRequest) | |||
VideoId_OR=str(videoId['VideoId']) | |||
@@ -274,7 +275,8 @@ def test5(par): | |||
msg['type']=str(1) | |||
msg["msg_id"]=msgId | |||
msg['results'][0]['original_url']= taskInfos['offering_id'] | |||
videoUrl=os.path.join(videoBakDir,msgId+'.MP4') | |||
videoUrl= get_videoUurl(videoBakDir,msgId+'.MP4') | |||
assert os.path.exists(videoUrl) , '%s not exists'%(videoUrl) | |||
upCnt=1;upLoaded=False | |||
while upCnt<4: | |||
try: | |||
@@ -291,7 +293,7 @@ def test5(par): | |||
msg['error']='video uploading failure' | |||
if upLoaded: | |||
if typename=='结束': msg["status"]="success" | |||
else: msg["status"]="timeout" | |||
else: msg["status"]="success_timeout" | |||
else: | |||
msg["status"]='failed' | |||
time_s2 = time.time() |
@@ -22,7 +22,7 @@ from kafka.errors import kafka_errors | |||
import utilsK | |||
from utilsK.GPUtils import * | |||
from utilsK.masterUtils import * | |||
from utilsK.sendUtils import create_status_msg,update_json | |||
from utilsK.sendUtils import create_status_msg,update_json,get_today | |||
#from utilsK.modelEval import onlineModelProcsss | |||
import random,string | |||
@@ -152,19 +152,20 @@ def onlineModelProcess(parIn ): | |||
childCallback.send('####model load success####') | |||
print('#####line153:',outVideoDir,video_flag) | |||
os.makedirs( os.path.join(outVideoDir,get_today()) ,exist_ok=True) | |||
if (outVideoDir!='NO') : ####2022.06.27新增在线任务也要传AI视频和原始视频 | |||
if video_flag: | |||
msg_id = streamName.split('-')[2] | |||
save_path = os.path.join(outVideoDir,msg_id+'.MP4') | |||
save_path = os.path.join(outVideoDir,get_today(),msg_id+'.MP4') | |||
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) | |||
if vid_writer.isOpened(): outstr='touch video success:%s'%(save_path);level='INFO' | |||
else:outstr='touch video failed:%s'%(save_path);level='ERROR' | |||
writeELK_log(msg=outstr,fp=fp_log,level=level,line=sys._getframe().f_lineno,logger=logger) | |||
else: | |||
msg_id = streamName.split('-')[2] | |||
save_path_OR = os.path.join(outVideoDir,msg_id+'_OR.MP4') | |||
save_path_OR = os.path.join(outVideoDir,get_today(),msg_id+'_OR.MP4') | |||
vid_writer_OR = cv2.VideoWriter(save_path_OR, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) | |||
save_path_AI = os.path.join(outVideoDir,msg_id+'_AI.MP4') | |||
save_path_AI = os.path.join(outVideoDir,get_today(),msg_id+'_AI.MP4') | |||
vid_writer_AI = cv2.VideoWriter(save_path_AI, cv2.VideoWriter_fourcc(*'mp4v'), fps, (outW,outH)) | |||
if vid_writer_AI.isOpened() and vid_writer_OR.isOpened() :outstr='touch video success:%s,%s'%(save_path_OR,save_path_AI);level='INFO' | |||
else:outstr='touch video failed:%s,%s, fps:%d ,%d , %d'%(save_path_OR,save_path_AI,fps,outW,outH);level='ERROR' | |||
@@ -404,7 +405,7 @@ def get_msg_from_kafka(par): | |||
try: | |||
taskInfos['inSource']= taskInfos['offering_id']; | |||
taskInfos['outSource']= 'NO' | |||
taskStatus['offLine'].put( taskInfos ) | |||
save_message(par['kafka'],taskInfos) | |||
@@ -474,7 +475,7 @@ def detector(par): | |||
else:###3.1.2-没有显卡 | |||
##判断有没有显卡上面都是离线进程的 | |||
cuda_pid = get_potential_gpu(gpuStatus,taskStatus['pidInfos']) | |||
print('###line478:',cuda_pid) | |||
if cuda_pid:#3.1.2.1 - ##如果有可以杀死的进程 | |||
cuda = cuda_pid['cuda'] | |||
pids = cuda_pid['pids'] |
@@ -25,8 +25,8 @@ def producer_demo(): | |||
#pull_channel = "rtmp://live.play.t-aaron.com/live/THSA" | |||
#push_channel = 'rtmp://live.push.t-aaron.com/live/THSB' | |||
#pull_channel = 'rtmp://live.play.t-aaron.com/live/THSAa_hd' | |||
pull_channel = 'http://live.play.t-aaron.com/live/THSAl_hd.m3u8' | |||
push_channel = "rtmp://live.push.t-aaron.com/live/THSBd" | |||
pull_channel = 'http://live.play.t-aaron.com/live/THSAa_hd.m3u8' | |||
push_channel = "rtmp://live.push.t-aaron.com/live/THSBa" | |||
else: | |||
pull_channel = "rtmp://demoplay.yunhengzhizao.cn/live/THSA_HD5M" | |||
push_channel = "rtmp://127.0.0.1:1935/live/test" |
@@ -18,13 +18,22 @@ from kafka import KafkaProducer, KafkaConsumer | |||
from voduploadsdk.UploadVideoRequest import UploadVideoRequest | |||
from voduploadsdk.AliyunVodUtils import * | |||
from voduploadsdk.AliyunVodUploader import AliyunVodUploader | |||
from datetime import datetime, date, timedelta | |||
def get_today(): | |||
return date.today().strftime("%Y-%m-%d") | |||
def get_yesterday(beforeday=-1): | |||
return (date.today() + timedelta(days =beforeday)).strftime("%Y-%m-%d") | |||
def get_videoUurl(videoBakDir,filename): | |||
###七天时间内 | |||
potentialUrls=[ os.path.join( videoBakDir,get_yesterday(beforeday=-x),filename) for x in range(7) ] | |||
existsList=[os.path.exists(x ) for x in potentialUrls] | |||
for i,flag in enumerate(existsList): | |||
if flag: return potentialUrls[i] | |||
return potentialUrls[0] | |||
def get_play_info(clt, videoId): | |||
request = GetPlayInfoRequest.GetPlayInfoRequest() | |||
request.set_accept_format('JSON') |