You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

399 lines
20KB

  1. # -*- coding: utf-8 -*-
  2. from concurrent.futures import ThreadPoolExecutor
  3. from multiprocessing import Process, Queue
  4. from os.path import join, exists
  5. from threading import Thread
  6. from time import sleep, time
  7. from traceback import format_exc
  8. from loguru import logger
  9. from bean.Feedback import upload_result
  10. from enums.ExceptionEnum import UploadStrExceptionType
  11. from enums.StatusEnum import UploadTaskStatusType, UploadStatusType
  12. from exception.CustomerException import ServiceException
  13. from util.AliyunUtil import OssUtil, VodUtil
  14. from util.FileUtil import get_all_images, get_all_videos, get_file_name, delete_file, create_dir, \
  15. move_file, remove_after_create
  16. from util.LogUtils import init_log
  17. from util.QueUtil import put_queue, get_no_block_queue, clear_queue
  18. from util.ThreadUtil import ThreadPoolExecutorNew
  19. from util.TimeUtils import now_date_to_str, YMDHMS
  20. '''
  21. 视频上传
  22. '''
  23. class MqttUploadFileProcess(Process):
  24. __slots__ = ('__fb_queue', '__event_queue', '__service_config', '__requestId', '__current_time', 'videoPath',
  25. 'imagePath', 'image_backup', 'video_backup')
  26. def __init__(self, fb_queue, service_config, request_id):
  27. super().__init__()
  28. self.__fb_queue = fb_queue
  29. self.__requestId = request_id
  30. self.__event_queue = Queue()
  31. self.__service_config = service_config
  32. self.__current_time = now_date_to_str(YMDHMS)
  33. backup = join(service_config["file"]["backup"], self.__current_time)
  34. self.videoPath, self.imagePath = service_config["file"]['videoPath'], service_config["file"]['imagePath']
  35. self.image_backup, self.video_backup = join(backup, 'images'), join(backup, 'videos')
  36. upload_result(fb_queue, request_id, status=UploadTaskStatusType.WAITING.value[0])
  37. def sendEvent(self, result):
  38. put_queue(self.__event_queue, result, timeout=2, is_throw_ex=True)
  39. @staticmethod
  40. def uploadImage(oss, uploadPath, filePath):
  41. try:
  42. oss.resumable_upload(uploadPath, filePath)
  43. except Exception as e:
  44. oss.exception = e
  45. logger.error("oss上传文件异常: {}, uploadPath:{}, filePath:{}", format_exc(), uploadPath, filePath)
  46. raise e
  47. @staticmethod
  48. def uploadVideo(vod, file_title, filePath):
  49. try:
  50. vod.get_play_url(filePath, file_title)
  51. except Exception as e:
  52. vod.exception = e
  53. logger.error("oss上传文件异常: {}, file_title:{}, filePath:{}", format_exc(), file_title, filePath)
  54. raise e
  55. @staticmethod
  56. def move_method(file, backup):
  57. move_file(file, backup)
  58. @staticmethod
  59. def hdThread(imageList, videoList, fb_queue, hb_queue, requestId):
  60. try:
  61. logger.info("启动文件上传心跳线程, requestId:{}", requestId)
  62. start_time = time()
  63. count = 0
  64. while True:
  65. if time() - start_time > 43200:
  66. logger.error("心跳线程运行超时!!!!requestId:{}", requestId)
  67. break
  68. command = get_no_block_queue(hb_queue)
  69. if command is not None:
  70. if "stop" == command.get("command"):
  71. logger.info("开始停止心跳线程!!!!requestId:{}", requestId)
  72. break
  73. if count % 5 == 0:
  74. upload_result(fb_queue, requestId, status=UploadTaskStatusType.RUNNING.value[0],
  75. imageList=imageList, videoList=videoList)
  76. count = 0
  77. count += 1
  78. sleep(1)
  79. except Exception:
  80. logger.error("心跳线程异常:{}, requestId:{}", format_exc(), requestId)
  81. logger.info("心跳线程停止完成!requestId:{}", requestId)
  82. def upload_method(self, service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
  83. imageList, it):
  84. current_time = self.__current_time
  85. for i in video_path_array:
  86. vod = VodUtil(service_config["base_dir"])
  87. filename = get_file_name(i)
  88. video_result = vt.submit(self.uploadVideo, vod, filename, i)
  89. videoTask[filename] = [video_result, vod, i]
  90. videoList.append({"fileName": filename, "videoUrl": "", "status": UploadStatusType.WAITING.value[0],
  91. "progress": vod.get_progress()})
  92. for i in image_path_array:
  93. oss = OssUtil(service_config["base_dir"])
  94. fileName = get_file_name(i)
  95. uploadPath = "%s/%s" % (current_time, fileName)
  96. image_result = it.submit(self.uploadImage, oss, uploadPath, i)
  97. imageTask[fileName] = [image_result, oss, i]
  98. imageList.append({"fileName": fileName, "imageUrl": "", "status": UploadStatusType.WAITING.value[0],
  99. "progress": oss.progress})
  100. def start_hd_thread(self, imageList, videoList, fb_queue, hb_queue, requestId):
  101. hd = Thread(target=self.hdThread, args=(imageList, videoList, fb_queue, hb_queue, requestId))
  102. hd.setDaemon(True)
  103. hd.start()
  104. return hd
  105. @staticmethod
  106. def stop_hd_thread(hb, hb_queue, requestId):
  107. if hb is not None and hb.is_alive():
  108. put_queue(hb_queue, {"command": "stop"}, timeout=2, is_throw_ex=False)
  109. start = time()
  110. hb.join(60)
  111. if time() - start > 60:
  112. logger.error("心跳线程停止超时, requestId:{}", requestId)
  113. clear_queue(hb_queue)
  114. @staticmethod
  115. def change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList, deleteVideoList):
  116. for image in imageList:
  117. if image["status"] < UploadStatusType.SUCCESS.value[0]:
  118. num += 1
  119. image_task = imageTask.get(image.get("fileName"))
  120. # (image_result, oss, i)
  121. if image_task:
  122. if not image_task[0].done():
  123. image["status"] = UploadStatusType.RUNNING.value[0]
  124. image["progress"] = image_task[1].get_progress()
  125. continue
  126. if image_task[0].done():
  127. try:
  128. image_task[0].result()
  129. image["imageUrl"] = image_task[1].get_image_url()
  130. image["status"] = UploadStatusType.SUCCESS.value[0]
  131. image["progress"] = "1.0000"
  132. deleteImageList.append(image_task[2])
  133. # delete_file(image_task[2])
  134. except Exception:
  135. logger.error("文件{}上传失败, 异常: {}, requestId: {}", image_task[2], format_exc(),
  136. requestId)
  137. image["status"] = UploadStatusType.FAILED.value[0]
  138. image["progress"] = image_task[1].get_progress()
  139. for video in videoList:
  140. if video["status"] < UploadStatusType.SUCCESS.value[0]:
  141. num += 1
  142. video_task = videoTask.get(video.get("fileName"))
  143. if video_task:
  144. # 如果任务已经完成
  145. # (video_result, vod, i)
  146. if not video_task[0].done():
  147. video["status"] = UploadStatusType.RUNNING.value[0]
  148. video["progress"] = video_task[1].get_progress()
  149. continue
  150. if video_task[0].done():
  151. try:
  152. video_task[0].result()
  153. video["videoUrl"] = video_task[1].get_video_url()
  154. video["status"] = UploadStatusType.SUCCESS.value[0]
  155. video["progress"] = "1.0000"
  156. deleteVideoList.append(video_task[2])
  157. # delete_file(video_task[2])
  158. except Exception:
  159. logger.error("文件{}上传失败, 异常:{}, requestId: {}", video_task[2], format_exc(),
  160. requestId)
  161. video["status"] = UploadStatusType.FAILED.value[0]
  162. video["progress"] = video_task[1].get_progress()
  163. return num
  164. @staticmethod
  165. def stop_all_task(imageList, imageTask, videoList, videoTask):
  166. for image in imageList:
  167. if image["status"] < UploadStatusType.SUCCESS.value[0]:
  168. image_task = imageTask.get(image.get("fileName"))
  169. # (image_result, oss, i)
  170. if image_task:
  171. if not image_task[0].done():
  172. # 如果任务没有结束, 停止设置取消任务
  173. image_task[1].status = True
  174. for video in videoList:
  175. if video["status"] < UploadStatusType.SUCCESS.value[0]:
  176. video_task = videoTask.get(video.get("fileName"))
  177. if video_task:
  178. # 如果任务已经完成
  179. # (video_result, vod, i)
  180. if not video_task[0].done():
  181. video_task[1].status = True
  182. if video_task[1].uploader is not None:
  183. if video_task[1].uploader.uploader is not None:
  184. video_task[1].uploader.uploader.status = True
  185. @staticmethod
  186. def stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList):
  187. for image in imageList:
  188. if image["status"] < UploadStatusType.SUCCESS.value[0]:
  189. image_task = imageTask.get(image.get("fileName"))
  190. # (image_result, oss, i)
  191. if image_task:
  192. oss = image_task[1]
  193. status = oss.status
  194. image_url = oss.get_image_url()
  195. if image_url is not None and len(image_url) > 0:
  196. image["imageUrl"] = image_url
  197. image["status"] = UploadStatusType.SUCCESS.value[0]
  198. image["progress"] = "1.0000"
  199. deleteImageList.append(image_task[2])
  200. # delete_file(image_task[2])
  201. else:
  202. if status:
  203. image["status"] = UploadStatusType.CANCEL.value[0]
  204. image["progress"] = oss.get_progress()
  205. else:
  206. image["status"] = UploadStatusType.FAILED.value[0]
  207. image["progress"] = oss.get_progress()
  208. for video in videoList:
  209. if video["status"] < UploadStatusType.SUCCESS.value[0]:
  210. video_task = videoTask.get(video.get("fileName"))
  211. if video_task:
  212. # 如果任务已经完成
  213. # (video_result, vod, i)
  214. vod = video_task[1]
  215. status = vod.status
  216. video_url = vod.get_video_url()
  217. if video_url is not None and len(video_url) > 0:
  218. video["videoUrl"] = video_url
  219. video["status"] = UploadStatusType.SUCCESS.value[0]
  220. video["progress"] = "1.0000"
  221. deleteVideoList.append(video_task[2])
  222. # delete_file(video_task[2])
  223. else:
  224. if status:
  225. video["status"] = UploadStatusType.CANCEL.value[0]
  226. video["progress"] = vod.get_progress()
  227. else:
  228. video["status"] = UploadStatusType.FAILED.value[0]
  229. video["progress"] = vod.get_progress()
  230. @staticmethod
  231. def wait_thread(it, vt, requestId):
  232. if it:
  233. it.shutdown()
  234. logger.info("it线程池关闭完成m, requestId:{}", requestId)
  235. if vt:
  236. vt.shutdown()
  237. logger.info("vt线程池关闭完成m, requestId:{}", requestId)
  238. def run(self):
  239. imageList, videoList = [], []
  240. deleteImageList, deleteVideoList = [], []
  241. # 线程任务
  242. imageTask, videoTask = {}, {}
  243. # 备份路径
  244. image_backup, video_backup = self.image_backup, self.video_backup
  245. imagePath, videoPath = self.imagePath, self.videoPath
  246. # 队列
  247. fb_queue, hb_queue, event_queue = self.__fb_queue, Queue(), self.__event_queue
  248. requestId, service_config = self.__requestId, self.__service_config
  249. it, vt, ex, hd = None, None, None, None
  250. try:
  251. init_log(service_config["base_dir"])
  252. logger.info("启动文件上传进程!requestId:{}", requestId)
  253. # 检查文件路径是否存在,不存在则路径有问题
  254. if not exists(imagePath) or not exists(videoPath):
  255. logger.error("图片、视频路径检测失败, 图片或视频路径不存在!requestId:{}", requestId)
  256. logger.error("图片路径: {}, 视频路径: {}, requestId:{}", imagePath, videoPath, requestId)
  257. raise ServiceException(UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[0],
  258. UploadStrExceptionType.FILE_PATH_IS_NOT_AREADLY.value[1])
  259. # 检查是否有视频和图片,没有直接返回
  260. image_path_array, video_path_array = get_all_images(imagePath), get_all_videos(videoPath)
  261. if len(image_path_array) == 0 and len(video_path_array) == 0:
  262. logger.info("未查询到本地视频及图片文件!!!requestId:{}", requestId)
  263. # upload_result(fb_queue, requestId, status=UploadTaskStatusType.SUCCESS.value[0])
  264. return
  265. # 启动心跳线程
  266. hd = self.start_hd_thread(imageList, videoList, fb_queue, hb_queue, requestId)
  267. # 初始化图片、视频线程池
  268. it = ThreadPoolExecutorNew(max_workers=10, thread_name_prefix='图片线程池')
  269. vt = ThreadPoolExecutorNew(max_workers=2, thread_name_prefix='视频线程线程池')
  270. self.upload_method(service_config, video_path_array, videoTask, videoList, vt, image_path_array, imageTask,
  271. imageList, it)
  272. start_time = time()
  273. while True:
  274. # 超时检查
  275. if time() - start_time >= 43200:
  276. logger.error("上传文件任务执行超时!requestId: {}", requestId)
  277. raise ServiceException(UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[0],
  278. UploadStrExceptionType.TASK_IS_RUN_TIMEOUT.value[1])
  279. # 心跳检查
  280. if hd is not None and not hd.is_alive():
  281. logger.error("心跳线程异常停止, requestId:{}", requestId)
  282. raise Exception("心跳线程异常停止!")
  283. event_result = get_no_block_queue(event_queue)
  284. # 终止任务
  285. if event_result is not None and event_result.get("command") == "stop":
  286. # 清空线程队列
  287. it.clear_work_queue()
  288. vt.clear_work_queue()
  289. # 停止所有任务
  290. self.stop_all_task(imageList, imageTask, videoList, videoTask)
  291. # 等待线程任务完成
  292. it.shutdown()
  293. vt.shutdown()
  294. # 更新任务状态
  295. self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList,
  296. deleteVideoList)
  297. break
  298. num = 0
  299. num = self.change_status(imageList, num, imageTask, videoList, videoTask, requestId, deleteImageList,
  300. deleteVideoList)
  301. if num == 0:
  302. break
  303. sleep(1)
  304. except ServiceException as s:
  305. ex = s.code, s.msg
  306. if it:
  307. it.clear_work_queue()
  308. if vt:
  309. vt.clear_work_queue()
  310. # 停止所有任务
  311. self.stop_all_task(imageList, imageTask, videoList, videoTask)
  312. if it:
  313. it.shutdown()
  314. if vt:
  315. vt.shutdown()
  316. self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
  317. logger.error("上传文件任务异常失败: {}, requestId:{}", s.msg, requestId)
  318. except Exception:
  319. ex = UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0], \
  320. UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1]
  321. if it:
  322. it.clear_work_queue()
  323. if vt:
  324. vt.clear_work_queue()
  325. # 停止所有任务
  326. self.stop_all_task(imageList, imageTask, videoList, videoTask)
  327. if it:
  328. it.shutdown()
  329. if vt:
  330. vt.shutdown()
  331. self.stop_change_status(imageList, imageTask, videoList, videoTask, deleteImageList, deleteVideoList)
  332. logger.error("上传文件任务异常失败: {}, requestId:{}", format_exc(), requestId)
  333. finally:
  334. try:
  335. if len(deleteImageList) > 0 or len(deleteVideoList) > 0:
  336. with ThreadPoolExecutor(max_workers=20) as de:
  337. for imageFile in deleteImageList:
  338. logger.error("删除图片文件, 文件名: {}, requestId: {}", imageFile, requestId)
  339. de.submit(delete_file, imageFile)
  340. for videoFile in deleteVideoList:
  341. logger.error("删除视频文件, 文件名: {}, requestId: {}", videoFile, requestId)
  342. de.submit(delete_file, videoFile)
  343. de.shutdown(wait=True)
  344. self.wait_thread(it, vt, requestId)
  345. image_path_array1, video_path_array1 = get_all_images(imagePath), get_all_videos(videoPath)
  346. if len(image_path_array1) > 0 or len(video_path_array1) > 0:
  347. with ThreadPoolExecutor(max_workers=20) as ec:
  348. for i in image_path_array1:
  349. create_dir(image_backup)
  350. logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
  351. ec.submit(move_file, i, image_backup)
  352. for i in video_path_array1:
  353. create_dir(video_backup)
  354. logger.error("上传文件失败文件迁移备份, 文件名: {}, requestId: {}", i, requestId)
  355. ec.submit(move_file, i, video_backup)
  356. ec.shutdown(wait=True)
  357. self.stop_hd_thread(hd, hb_queue, requestId)
  358. if ex:
  359. code, msg = ex
  360. upload_result(fb_queue, requestId, errorCode=code, errorMsg=msg,
  361. status=UploadTaskStatusType.FAILED.value[0], imageList=imageList, videoList=videoList)
  362. else:
  363. upload_result(fb_queue, requestId, status=UploadTaskStatusType.SUCCESS.value[0], imageList=imageList,
  364. videoList=videoList)
  365. except Exception:
  366. logger.error("未知异常: {}", format_exc())
  367. self.stop_hd_thread(hd, hb_queue, requestId)
  368. upload_result(fb_queue, requestId,
  369. errorCode=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[0],
  370. errorMsg=UploadStrExceptionType.SERVICE_INNER_EXCEPTION.value[1],
  371. status=UploadTaskStatusType.FAILED.value[0],
  372. imageList=imageList, videoList=videoList)
  373. finally:
  374. self.stop_hd_thread(hd, hb_queue, requestId)
  375. clear_queue(event_queue, is_ex=False)
  376. clear_queue(hb_queue, is_ex=False)
  377. tmp = join(service_config["base_dir"], r'tmp\oss')
  378. remove_after_create(tmp)
  379. logger.info("上传文件任务完成, requestId: {}", requestId)