|
- import shutil
- import time
- from glob import glob
- from os import remove, makedirs
- from os.path import join, isfile, exists, basename, splitext
- from traceback import format_exc
-
- from loguru import logger
-
- from enums.ExceptionEnum import ExceptionType
- from exception.CustomerException import ServiceException
- from util.ImageUtil import is_jpeg_image
- from util.VideoUtil import is_mp4_video
-
-
- def get_all_images(directory):
- images = []
- # 使用glob模块的通配符匹配查找所有图片文件
- image_files = glob(join(directory, '*.jpg')) + glob(join(directory, '*.jpeg')) + glob(join(directory, '*.png'))
- logger.info("识别到图片的数量: {}!", len(image_files))
- for image_file in image_files:
- if isfile(image_file) and is_jpeg_image(image_file):
- images.append(image_file)
- logger.info("识别是图片类型的数量: {}!", len(images))
- return images
-
-
- def get_all_videos(directory):
- videos = []
- # 使用glob模块的通配符匹配查找所有图片文件
- video_files = glob(join(directory, '*.mp4'))
- logger.info("识别到视频的数量: {}!", len(video_files))
- for video_file in video_files:
- if isfile(video_file) and is_mp4_video(video_file):
- videos.append(video_file)
- logger.info("识别是视频类型的数量: {}!", len(videos))
- return videos
-
-
- def delete_file(file_path, is_ex=False):
- retry_count = 0
- while retry_count < 3:
- try:
- if exists(file_path):
- remove(file_path)
- logger.info("删除文件成功: {}!", file_path)
- break
- except Exception:
- logger.error("删除文件失败: {}, 异常信息: {}", file_path, format_exc())
- if is_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
- finally:
- time.sleep(0.5)
- retry_count += 1
-
-
- def move_file(source_path, destination_path, is_ex=False):
- retry_count = 0
- while retry_count < 3:
- try:
- if exists(source_path) and exists(destination_path):
- shutil.move(source_path, destination_path)
- logger.info("迁移文件夹到指定文件夹成功! 原文件: {}, 目标文件: {}", source_path, destination_path)
- break
- except shutil.Error as sh:
- if "already exists" in str(sh):
- remove(source_path)
- break
- except OSError:
- logger.error("迁移文件夹到指定文件夹成功! 原文件: {}, 目标文件: {}, 异常信息: {}", source_path,
- destination_path,
- format_exc())
- if is_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- finally:
- time.sleep(0.5)
- retry_count += 1
-
-
- def create_dir(path, is_ex=False):
- try:
- if not exists(path):
- makedirs(path)
- logger.info("创建文件{}成功!!!", path)
- except Exception:
- logger.info("创建文件{}失败!异常: {}", path, format_exc())
- if is_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def remove_file(path, is_ex=False):
- try:
- if exists(path):
- shutil.rmtree(path)
- except Exception:
- logger.info("删除{}文件异常, 异常: {}", path, format_exc())
- if is_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def remove_after_create(path, is_ex=False):
- try:
- if exists(path):
- shutil.rmtree(path)
- create_dir(path)
- except Exception:
- logger.info("删除{}文件夹异常, 异常: {}", path, format_exc())
- if is_ex:
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def get_file_name(path):
- try:
- return basename(path)
- except Exception:
- logger.info("获取{}文件名失败!异常: {}", path, format_exc())
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def get_file_name_0(path):
- try:
- return splitext(basename(path))[0]
- except Exception:
- logger.info("获取{}文件名失败!异常: {}", path, format_exc())
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
-
- def get_file_name_1(path):
- try:
- return splitext(basename(path))[1]
- except Exception:
- logger.info("获取{}文件名失败!异常: {}", path, format_exc())
- raise ServiceException(ExceptionType.SERVICE_INNER_EXCEPTION.value[0],
- ExceptionType.SERVICE_INNER_EXCEPTION.value[1])
-
- # if __name__ == '__main__':
- # # print(get_all_videos(r"D:\test\video"))
- # # move_file(r"D:\test\video\111.mp4", r'D:\test\video\aa')
- # # iamges = get_all_images(r"D:\test\image")
- # # for i in iamges:
- # # print(get_file_name_0(i))
- # remove_file(r"D:\tuoheng\codenew\tuoheng_airprt_media\tmp\oss\aaa")
|