# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor from multiprocessing import Process, Queue from os.path import join, exists from threading import Thread from time import sleep, time from traceback import format_exc from loguru import logger from bean.Feedback import upload_http_result from enums.ExceptionEnum import UploadStrExceptionType from enums.StatusEnum import UploadTaskStatusType, UploadStatusType from exception.CustomerException import ServiceException from util.AliyunUtil import OssUtil, VodUtil from util.FileUtil import get_all_images, get_all_videos, get_file_name, delete_file, create_dir, \ move_file, remove_after_create from util.LogUtils import init_log from util.QueUtil import get_no_block_queue, put_queue, clear_queue from util.ThreadUtil import ThreadPoolExecutorNew from util.TimeUtils import now_date_to_str, YMDHMS class HttpUploadFileProcess(Process): __slots__ = ('__fb_queue', '__requestId', 'videoPath', 'imagePath', 'image_backup', 'video_backup', '__current_time', '__callback_url', '__service_config', '__event_queue') def __init__(self, fb_queue, service_config, callback_url, request_id): super().__init__() self.__fb_queue = fb_queue self.__requestId = request_id self.__event_queue = Queue() self.__service_config = service_config self.__callback_url = callback_url self.__current_time = now_date_to_str(YMDHMS) backup = join(service_config["file"]["backup"], self.__current_time) self.videoPath, self.imagePath = service_config["file"]['videoPath'], service_config["file"]['imagePath'] self.image_backup, self.video_backup = join(backup, 'images'), join(backup, 'videos') upload_http_result(fb_queue, callback_url, request_id, status=UploadTaskStatusType.WAITING.value[0]) def sendEvent(self, result): put_queue(self.__event_queue, result, timeout=2, is_throw_ex=True) @staticmethod def uploadImage(oss, uploadPath, filePath): try: oss.resumable_upload(uploadPath, filePath) except Exception as e: oss.exception = e logger.error("oss上传文件异常: {}, uploadPath:{}, filePath:{}", format_exc(), uploadPath, filePath) raise e @staticmethod def uploadVideo(vod, file_title, filePath): try: vod.get_play_url(filePath, file_title) except Exception as e: vod.exception = e logger.error("oss上传文件异常: {}, file_title:{}, filePath:{}", format_exc(), file_title, filePath) raise e @staticmethod def move_method(file, backup): move_file(file, backup) @staticmethod def hdThread(imageList, videoList, fb_queue, callback_url, hb_queue, requestId): try: logger.info("启动文件上传心跳线程, requestId:{}", requestId) start_time = time() count = 0 while True: if time() - start_time > 43200: logger.error("心跳线程运行超时!!!!requestId:{}", requestId) break command = get_no_block_queue(hb_queue) if command is not None: if "stop" == command.get("command"): logger.info("开始停止心跳线程!!!!requestId:{}", requestId) break if count % 5 == 0: upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.RUNNING.value[0], imageList=imageList, videoList=videoList) count = 0 count += 1 sleep(1) except Exception: logger.error("心跳线程异常:{}, requestId:{}", format_exc(), requestId) logger.info("心跳线程停止完成!requestId:{}", requestId) def upload_method(self, service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask, imageList, it): current_time = self.__current_time for i in video_path_array: vod = VodUtil(service_config["base_dir"]) filename = get_file_name(i) video_result = vt.submit(self.uploadVideo, vod, filename, i) videoTask[filename] = [video_result, vod, i] videoList.append({"fileName": filename, "videoUrl": "", "status": UploadStatusType.WAITING.value[0], "progress": vod.get_progress()}) for i in image_path_array: oss = OssUtil(service_config["base_dir"]) fileName = get_file_name(i) uploadPath = "%s/%s" % (current_time, fileName) image_result = it.submit(self.uploadImage, oss, uploadPath, i) imageTask[fileName] = [image_result, oss, i] imageList.append({"fileName": fileName, "imageUrl": "", "status": UploadStatusType.WAITING.value[0], "progress": oss.progress}) def start_hd_thread(self, imageList, videoList, fb_queue, callback_url, hb_queue, requestId): hd = Thread(target=self.hdThread, args=(imageList, videoList, fb_queue, callback_url, hb_queue, requestId)) hd.setDaemon(True) hd.start() return hd @staticmethod def stop_hd_thread(hb, hb_queue, requestId): if hb is not None and hb.is_alive(): put_queue(hb_queue, {"command": "stop"}, timeout=2, is_throw_ex=False) start = time() hb.join(60) if time() - start > 60: logger.error("心跳线程停止超时, requestId:{}", requestId) clear_queue(hb_queue) @staticmethod def change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList, deleteVideoList): for image in imageList: if image["status"] < UploadStatusType.SUCCESS.value[0]: num += 1 image_task = imageTask.get(image.get("fileName")) # (image_result, oss, i) if image_task: if not image_task[0].done(): image["status"] = UploadStatusType.RUNNING.value[0] image["progress"] = image_task[1].get_progress() continue if image_task[0].done(): try: image_task[0].result() image["imageUrl"] = image_task[1].get_image_url() image["status"] = UploadStatusType.SUCCESS.value[0] image["progress"] = "1.0000" deleteImageList.append(image_task[2]) # delete_file(image_task[2]) except Exception: logger.error("文件{}上传失败, 异常: {}, requestId: {}", image_task[2], format_exc(), requestId) image["status"] = UploadStatusType.FAILED.value[0] image["progress"] = image_task[1].get_progress() for video in videoList: if video["status"] < UploadStatusType.SUCCESS.value[0]: num += 1 video_task = videoTask.get(video.get("fileName")) if video_task: # 如果任务已经完成 # (video_result, vod, i) if not video_task[0].done(): video["status"] = UploadStatusType.RUNNING.value[0] video["progress"] = video_task[1].get_progress() continue if video_task[0].done(): try: video_task[0].result() video["videoUrl"] = video_task[1].get_video_url() video["status"] = UploadStatusType.SUCCESS.value[0] video["progress"] = "1.0000" deleteVideoList.append(video_task[2]) # delete_file(video_task[2]) except Exception: logger.error("文件{}上传失败, 异常:{}, requestId: {}", video_task[2], format_exc(), requestId) video["status"] = UploadStatusType.FAILED.value[0] video["progress"] = video_task[1].get_progress() return num @staticmethod def stop_all_task(imageList, imageTask, videoList, videoTask): for image in imageList: if image["status"] < UploadStatusType.SUCCESS.value[0]: image_task = imageTask.get(image.get("fileName")) # (image_result, oss, i) if image_task: if not image_task[0].done(): # 如果任务没有结束, 停止设置取消任务 image_task[1].status = True for video in videoList: if video["status"] < UploadStatusType.SUCCESS.value[0]: video_task = videoTask.get(video.get("fileName")) if video_task: # 如果任务已经完成 # (video_result, vod, i) if not video_task[0].done(): video_task[1].status = True if video_task[1].uploader is not None: if video_task[1].uploader.uploader is not None: video_task[1].uploader.uploader.status = True @staticmethod def stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList): for image in imageList: if image["status"] < UploadStatusType.SUCCESS.value[0]: image_task = imageTask.get(image.get("fileName")) # (image_result, oss, i) if image_task: oss = image_task[1] status = oss.status image_url = oss.get_image_url() if image_url is not None and len(image_url) > 0: image["imageUrl"] = image_url image["status"] = UploadStatusType.SUCCESS.value[0] image["progress"] = "1.0000" deleteImageList.append(image_task[2]) # delete_file(image_task[2]) else: if status: image["status"] = UploadStatusType.CANCEL.value[0] image["progress"] = oss.get_progress() else: image["status"] = UploadStatusType.FAILED.value[0] image["progress"] = oss.get_progress() for video in videoList: if video["status"] < UploadStatusType.SUCCESS.value[0]: video_task = videoTask.get(video.get("fileName")) if video_task: # 如果任务已经完成 # (video_result, vod, i) vod = video_task[1] status = vod.status video_url = vod.get_video_url() if video_url is not None and len(video_url) > 0: video["videoUrl"] = video_url video["status"] = UploadStatusType.SUCCESS.value[0] video["progress"] = "1.0000" deleteVideoList.append(video_task[2]) # delete_file(video_task[2]) else: if status: video["status"] = UploadStatusType.CANCEL.value[0] video["progress"] = vod.get_progress() else: video["status"] = UploadStatusType.FAILED.value[0] video["progress"] = vod.get_progress() @staticmethod def wait_thread(it, vt, requestId): if it: it.shutdown() logger.info("it线程池关闭完成m, requestId:{}", requestId) if vt: vt.shutdown() logger.info("vt线程池关闭完成m, requestId:{}", requestId) def run(self): # 文件 imageList, videoList = [], [] deleteImageList, deleteVideoList = [], [] # 线程任务 imageTask, videoTask = {}, {} # 备份路径 image_backup, video_backup = self.image_backup, self.video_backup imagePath, videoPath = self.imagePath, self.videoPath # 队列 fb_queue, hb_queue, event_queue = self.__fb_queue, Queue(), self.__event_queue requestId, service_config, callback_url = self.__requestId, self.__service_config, self.__callback_url it, vt, ex, hd = None, None, None, None try: init_log(service_config["base_dir"]) logger.info("启动文件上传进程!requestId:{}", requestId) # 检查文件路径是否存在,不存在则路径有问题 if not exists(imagePath) or not exists(videoPath): logger.error("图片、视频路径检测失败, 图片或视频路径不存在!requestId:{}", requestId) logger.error("图片路径: {}, 视频路径: {}, requestId:{}", imagePath, videoPath, requestId) raise ServiceException(UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[0], UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[1]) # 检查是否有视频和图片,没有直接返回 image_path_array, video_path_array = get_all_images(imagePath), get_all_videos(videoPath) if len(image_path_array) == 0 and len(video_path_array) == 0: logger.info("未查询到本地视频及图片文件!!!requestId:{}", requestId) # upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.SUCCESS.value[0]) return # 启动心跳线程 hd = self.start_hd_thread(imageList, videoList, fb_queue, callback_url, hb_queue, requestId) # 初始化图片、视频线程池 it = ThreadPoolExecutorNew(max_workers=10, thread_name_prefix='图片线程池') vt = ThreadPoolExecutorNew(max_workers=2, thread_name_prefix='视频线程线程池') self.upload_method(service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask, imageList, it) start_time = time() while True: # 超时检查 if time() - start_time >= 43200: logger.error("上传文件任务执行超时!requestId: {}", requestId) raise ServiceException(UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[0], UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[1]) # 心跳检查 if hd is not None and not hd.is_alive(): logger.error("心跳线程异常停止, requestId:{}", requestId) raise Exception("心跳线程异常停止!") event_result = get_no_block_queue(event_queue) # 终止任务 if event_result is not None and event_result.get("command") == "stop": # 清空线程队列 it.clear_work_queue() vt.clear_work_queue() # 停止所有任务 self.stop_all_task(imageList, imageTask, videoList, videoTask) # 等待线程任务完成 it.shutdown() vt.shutdown() # 更新任务状态 self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) break num = 0 num = self.change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList, deleteVideoList) if num == 0: break sleep(1) except ServiceException as s: ex = s.code, s.msg if it: it.clear_work_queue() if vt: vt.clear_work_queue() # 停止所有任务 self.stop_all_task(imageList, imageTask, videoList, videoTask) if it: it.shutdown() if vt: vt.shutdown() self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId) except Exception: ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \ UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1] if it: it.clear_work_queue() if vt: vt.clear_work_queue() # 停止所有任务 self.stop_all_task(imageList, imageTask, videoList, videoTask) if it: it.shutdown() if vt: vt.shutdown() self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList) logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId) finally: try: if len(deleteImageList) > 0 or len(deleteVideoList) > 0: with ThreadPoolExecutor(max_workers=20) as de: for imageFile in deleteImageList: logger.error("删除图片文件, 文件名: {}, requestId: {}", imageFile, requestId) de.submit(delete_file, imageFile) for videoFile in deleteVideoList: logger.error("删除视频文件, 文件名: {}, requestId: {}", videoFile, requestId) de.submit(delete_file, videoFile) de.shutdown(wait=True) self.wait_thread(it, vt, requestId) image_path_array1, video_path_array1 = get_all_images(imagePath), get_all_videos(videoPath) if len(image_path_array1) > 0 or len(video_path_array1) > 0: with ThreadPoolExecutor(max_workers=20) as ec: for i in image_path_array1: create_dir(image_backup) logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId) ec.submit(move_file, i, image_backup) for i in video_path_array1: create_dir(video_backup) logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId) ec.submit(move_file, i, video_backup) ec.shutdown(wait=True) self.stop_hd_thread(hd, hb_queue, requestId) if ex: code, msg = ex upload_http_result(fb_queue, callback_url, requestId, errorCode=code, errorMsg=msg, status=UploadTaskStatusType.FAILED.value[0], imageList=imageList, videoList=videoList) else: upload_http_result(fb_queue, callback_url, requestId, status=UploadTaskStatusType.SUCCESS.value[0], imageList=imageList, videoList=videoList) except Exception: logger.error("未知异常: {}", format_exc()) self.stop_hd_thread(hd, hb_queue, requestId) upload_http_result(fb_queue, callback_url, requestId, errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1], status=UploadTaskStatusType.FAILED.value[0], imageList=imageList, videoList=videoList) finally: self.stop_hd_thread(hd, hb_queue, requestId) clear_queue(event_queue, is_ex=False) clear_queue(hb_queue, is_ex=False) tmp = join(service_config["base_dir"], r'tmp\oss') remove_after_create(tmp) logger.info("上传文件任务完成, requestId: {}", requestId)