# -*- coding: utf-8 -*- import datetime import cv2 import oss2 import time from loguru import logger ''' 图片上传使用OSS 1. 阿里云对象存储OSS官网地址:https://help.aliyun.com/product/31815.html?spm=a2c4g.32006.0.0.8c546cf0BpkAQ2 2. 阿里云对象存储OSS SDK示例地址:https://help.aliyun.com/document_detail/32006.html?spm=a2c4g.32006.0.0.66874b78q1pwLa 3. python安装SDK地址: https://help.aliyun.com/document_detail/85288.html?spm=a2c4g.32026.0.0.3f24417coCphWj 4. 安装SDK: pip install oss2 5. 安装python-devel 安装python-devel 由于SDK需要crcmod库计算CRC校验码,而crcmod依赖Python.h文件,如果系统缺少这个头文件,安装SDK不会失败,但crcmod的C扩展模式安装会失败,因此导致上传、下载等操作效率非常低下。 如果python-devel包不存在,则首先要安装这个包。 对于Windows系统和Mac OS X系统,由于安装Python的时候会将Python依赖的头文件一并安装,因此您无需安装python-devel。 对于CentOS、RHEL、Fedora系统,请执行以下命令安装python-devel。 sudo yum install python-devel 对于Debian,Ubuntu系统,请执行以下命令安装python-devel。 sudo apt-get install python-dev 6、图片域名地址:https://image.t-aaron.com/ ''' class AliyunOssSdk: def __init__(self): self.__client = None self.__access_key = 'LTAI5tMiefafZ6br4zmrQWv9' self.__access_secret = 'JgzQjSCkwZ7lefZO6egOArw38YH1Tk' self.__endpoint = 'http://oss-cn-shanghai.aliyuncs.com' self.__bucket = 'ta-tech-image' def get_oss_bucket(self): if not self.__client: auth = oss2.Auth(self.__access_key, self.__access_secret) self.__client = oss2.Bucket(auth, self.__endpoint, self.__bucket, connect_timeout=30) def upload_file(self, updatePath, fileByte): logger.info("开始上传文件到oss!") MAX_RETRIES = 3 retry_count = 0 while True: try: self.get_oss_bucket() result = self.__client.put_object(updatePath, fileByte) return result logger.info("上传文件到oss成功!") break except Exception as e: self.__client = None retry_count += 1 time.sleep(1) logger.info("上传文件到oss失败, 重试次数:{}", retry_count) if retry_count > MAX_RETRIES: logger.exception("上传文件到oss重试失败:{}", e) raise e YY_MM_DD_HH_MM_SS = "%Y-%m-%d %H:%M:%S" YMDHMSF = "%Y%m%d%H%M%S%f" def generate_timestamp(): """根据当前时间获取时间戳,返回整数""" return int(time.time()) def now_date_to_str(fmt=None): if fmt is None: fmt = YY_MM_DD_HH_MM_SS return datetime.datetime.now().strftime(fmt) if __name__ == "__main__": # 初始化oss对象 ossClient = AliyunOssSdk() # 读取本地图片 image_frame = cv2.imread('aaa.jpeg') or_result, or_image = cv2.imencode(".jpg", image_frame) # 图片名称命名规则 # 1、base_dir 基本文件夹名称,由拓恒公司传参 # 2、time_now 现在的时间 # 3、current_frame 当前视频的帧数 # 4、last_frame 如果有跳帧操作, 填写跳帧的步长,如果没有,和current_frame参数保持一致 # 5、random_num 随机时间字符串 # 6、mode_type 类型:实时视频直播的方式用(online) 离线视频直播(填写视频地址识别)用(offline) # 7、requestId 请求id, 拓恒公司传参 # 8、image_type 原图用(OR) AI识别后的图片用(AI) random_num = now_date_to_str(YMDHMSF) time_now = now_date_to_str("%Y-%m-%d-%H-%M-%S") image_format = "{base_dir}/{time_now}_frame-{current_frame}-{last_frame}_type_{random_num}-{mode_type}-{base_dir}" \ "-{requestId}_{image_type}.jpg" image_name = image_format.format( base_dir='PWL202304141639429276', time_now=time_now, current_frame='0', last_frame='0', random_num=random_num, mode_type='offline', requestId='111111111111111111', image_type='OR') result = ossClient.upload_file(image_name, or_image.tobytes()) # print('http status: {0}'.format(result.status)) # # 请求ID。请求ID是本次请求的唯一标识,强烈建议在程序日志中添加此参数。 # print('request_id: {0}'.format(result.request_id)) # # ETag是put_object方法返回值特有的属性,用于标识一个Object的内容。 # print('ETag: {0}'.format(result.etag)) # # HTTP响应头部。 # print('date: {0}'.format(result.headers['date'])) # print(result.__reduce__()) # 对于图片上传, 上传成功后,直接将image_name给拓恒公司就可以了 # 如果测试查看图片是否上传成功 # 可以使用域名拼接 image_url = 'https://image.t-aaron.com/' + image_name print(image_url) # 拓恒公司只需要image_name