Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

744 lines
35KB

  1. # -*- coding: UTF-8 -*-
  2. import json
  3. from traceback import format_exc
  4. import oss2
  5. import base64
  6. import time
  7. from aliyunsdkcore import client
  8. from aliyunsdkvod.request.v20170321 import CreateUploadVideoRequest
  9. from aliyunsdkvod.request.v20170321 import RefreshUploadVideoRequest
  10. from aliyunsdkvod.request.v20170321 import CreateUploadImageRequest
  11. from aliyunsdkvod.request.v20170321 import CreateUploadAttachedMediaRequest
  12. from vodsdk.AliyunVodUtils import *
  13. from loguru import logger
  14. VOD_MAX_TITLE_LENGTH = 128
  15. VOD_MAX_DESCRIPTION_LENGTH = 1024
  16. class AliyunVodUploader:
  17. __slots__ = (
  18. '__accessKeyId',
  19. '__accessKeySecret',
  20. '__ecsRegion',
  21. '__vodApiRegion',
  22. '__connTimeout',
  23. '__bucketClient',
  24. '__maxRetryTimes',
  25. '__vodClient',
  26. '__EnableCrc',
  27. '__multipartThreshold',
  28. '__multipartPartSize',
  29. '__multipartThreadsNum',
  30. '__progress_callback',
  31. 'uploader'
  32. )
  33. def __init__(self, accessKeyId, accessKeySecret, ecsRegionId=None, progress_callback=None):
  34. """
  35. constructor for VodUpload
  36. :param accessKeyId: string, access key id
  37. :param accessKeySecret: string, access key secret
  38. :param ecsRegion: string, 部署迁移脚本的ECS所在的Region,详细参考:https://help.aliyun.com/document_detail/40654.html,如:cn-beijing
  39. :return
  40. """
  41. self.__accessKeyId = accessKeyId
  42. self.__accessKeySecret = accessKeySecret
  43. self.__ecsRegion = ecsRegionId
  44. self.__vodApiRegion = None
  45. self.__connTimeout = 60
  46. self.__bucketClient = None
  47. self.__maxRetryTimes = 5
  48. self.__vodClient = None
  49. self.__EnableCrc = True
  50. # 分片上传参数
  51. self.__multipartThreshold = 10 * 1024 * 1024 # 分片上传的阈值,超过此值开启分片上传
  52. self.__multipartPartSize = 10 * 1024 * 1024 # 分片大小,单位byte
  53. self.__multipartThreadsNum = 3 # 分片上传时并行上传的线程数,暂时为串行上传,不支持并行,后续会支持。
  54. # 设置apiRegion为cn-shanghai, 初始化客户端self.__vodClient
  55. self.setApiRegion('cn-shanghai')
  56. self.__progress_callback = progress_callback
  57. logger.info("初始化阿里云视频上传sdk,连接超时时间:{}, 重试次数:{}", self.__connTimeout, self.__maxRetryTimes)
  58. self.uploader = None
  59. def setApiRegion(self, apiRegion):
  60. """
  61. 设置VoD的接入地址,中国大陆为cn-shanghai,海外支持ap-southeast-1(新加坡)等区域,详情参考:https://help.aliyun.com/document_detail/98194.html
  62. :param apiRegion: 接入地址的Region英文表示
  63. :return:
  64. """
  65. self.__vodApiRegion = apiRegion
  66. self.__vodClient = self.__initVodClient()
  67. def __initVodClient(self):
  68. return client.AcsClient(self.__accessKeyId, self.__accessKeySecret, self.__vodApiRegion,
  69. auto_retry=True, max_retry_time=self.__maxRetryTimes, timeout=self.__connTimeout)
  70. def setMultipartUpload(self, multipartThreshold=10 * 1024 * 1024, multipartPartSize=10 * 1024 * 1024,
  71. multipartThreadsNum=1):
  72. if multipartThreshold > 0:
  73. self.__multipartThreshold = multipartThreshold
  74. if multipartPartSize > 0:
  75. self.__multipartPartSize = multipartPartSize
  76. if multipartThreadsNum > 0:
  77. self.__multipartThreadsNum = multipartThreadsNum
  78. def setEnableCrc(self, isEnable=False):
  79. self.__EnableCrc = True if isEnable else False
  80. @catch_error
  81. def uploadLocalVideo(self, uploadVideoRequest, startUploadCallback=None):
  82. """
  83. 上传本地视频或音频文件到点播,最大支持48.8TB的单个文件,暂不支持断点续传
  84. :param uploadVideoRequest: UploadVideoRequest类的实例,注意filePath为本地文件的绝对路径
  85. :param startUploadCallback为获取到上传地址和凭证(uploadInfo)后开始进行文件上传时的回调,可用于记录上传日志等;uploadId为设置的上传ID,可用于关联导入视频。
  86. :return
  87. """
  88. uploadInfo = self.__createUploadVideo(uploadVideoRequest)
  89. if startUploadCallback:
  90. startUploadCallback(uploadVideoRequest.uploadId, uploadInfo)
  91. headers = self.__getUploadHeaders(uploadVideoRequest)
  92. self.__uploadOssObjectWithRetry(uploadVideoRequest.filePath, uploadInfo['UploadAddress']['FileName'],
  93. uploadInfo, headers)
  94. return uploadInfo
  95. @catch_error
  96. def uploadWebVideo(self, uploadVideoRequest, startUploadCallback=None):
  97. """
  98. 上传网络视频或音频文件到点播,最大支持48.8TB的单个文件(需本地磁盘空间足够);会先下载到本地临时目录,再上传到点播存储
  99. :param uploadVideoRequest: UploadVideoRequest类的实例,注意filePath为网络文件的URL地址
  100. :return
  101. """
  102. # 下载文件
  103. uploadVideoRequest = self.__downloadWebMedia(uploadVideoRequest)
  104. # 上传到点播
  105. uploadInfo = self.__createUploadVideo(uploadVideoRequest)
  106. if startUploadCallback:
  107. startUploadCallback(uploadVideoRequest.uploadId, uploadInfo)
  108. headers = self.__getUploadHeaders(uploadVideoRequest)
  109. self.__uploadOssObjectWithRetry(uploadVideoRequest.filePath, uploadInfo['UploadAddress']['FileName'],
  110. uploadInfo, headers)
  111. # 删除本地临时文件
  112. os.remove(uploadVideoRequest.filePath)
  113. return uploadInfo['VideoId']
  114. @catch_error
  115. def uploadLocalM3u8(self, uploadVideoRequest, sliceFilePaths=None):
  116. """
  117. 上传本地m3u8视频或音频文件到点播,m3u8文件和分片文件默认在同一目录
  118. :param uploadVideoRequest: UploadVideoRequest类的实例,注意filePath为本地m3u8索引文件的绝对路径,
  119. 且m3u8文件的分片信息必须是相对地址,不能含有URL或本地绝对路径
  120. :param sliceFilePaths: list, 分片文件的本地路径列表,例如:['/opt/m3u8_video/sample_001.ts', '/opt/m3u8_video/sample_002.ts']
  121. sliceFilePaths为None时,会按照同一目录去解析分片地址;如不在同一目录等原因导致解析有误,可自行组装分片地址
  122. :return
  123. """
  124. if sliceFilePaths is None:
  125. sliceFilePaths = self.parseLocalM3u8(uploadVideoRequest.filePath)
  126. if (not isinstance(sliceFilePaths, list)) or len(sliceFilePaths) <= 0:
  127. raise AliyunVodException('InvalidM3u8SliceFile', 'M3u8 slice files invalid',
  128. 'sliceFilePaths invalid or m3u8 index file error')
  129. # 上传到点播的m3u8索引文件会重写,以此确保分片地址都为相对地址
  130. downloader = AliyunVodDownloader()
  131. m3u8LocalDir = downloader.getSaveLocalDir() + '/' + AliyunVodUtils.getStringMd5(uploadVideoRequest.fileName)
  132. downloader.setSaveLocalDir(m3u8LocalDir)
  133. m3u8LocalPath = m3u8LocalDir + '/' + os.path.basename(uploadVideoRequest.fileName)
  134. self.__rewriteM3u8File(uploadVideoRequest.filePath, m3u8LocalPath, True)
  135. # 获取上传凭证
  136. uploadVideoRequest.setFilePath(m3u8LocalPath)
  137. uploadInfo = self.__createUploadVideo(uploadVideoRequest)
  138. uploadAddress = uploadInfo['UploadAddress']
  139. headers = self.__getUploadHeaders(uploadVideoRequest)
  140. # 依次上传分片文件
  141. for sliceFilePath in sliceFilePaths:
  142. tempFilePath, sliceFileName = AliyunVodUtils.getFileBriefPath(sliceFilePath)
  143. self.__uploadOssObjectWithRetry(sliceFilePath, uploadAddress['ObjectPrefix'] + sliceFileName, uploadInfo,
  144. headers)
  145. # 上传m3u8文件
  146. self.__uploadOssObjectWithRetry(m3u8LocalPath, uploadAddress['FileName'], uploadInfo, headers)
  147. # 删除重写到本地的m3u8文件
  148. if os.path.exists(m3u8LocalPath):
  149. os.remove(m3u8LocalPath)
  150. if not os.listdir(m3u8LocalDir):
  151. os.rmdir(m3u8LocalDir)
  152. return uploadInfo['VideoId']
  153. @catch_error
  154. def uploadWebM3u8(self, uploadVideoRequest, sliceFileUrls=None):
  155. """
  156. 上传网络m3u8视频或音频文件到点播,需本地磁盘空间足够,会先下载到本地临时目录,再上传到点播存储
  157. :param uploadVideoRequest: UploadVideoRequest类的实例,注意filePath为m3u8网络文件的URL地址
  158. :param sliceFileUrls: list, 分片文件的url,例如:['http://host/sample_001.ts', 'http://host/sample_002.ts']
  159. sliceFileUrls为None时,会按照同一前缀解析分片地址;如分片路径和m3u8索引文件前缀不同等原因导致解析有误,可自行组装分片地址
  160. :return
  161. """
  162. if sliceFileUrls is None:
  163. sliceFileUrls = self.parseWebM3u8(uploadVideoRequest.filePath)
  164. if (not isinstance(sliceFileUrls, list)) or len(sliceFileUrls) <= 0:
  165. raise AliyunVodException('InvalidM3u8SliceFile', 'M3u8 slice urls invalid',
  166. 'sliceFileUrls invalid or m3u8 index file error')
  167. # 下载m3u8文件和所有ts分片文件到本地;上传到点播的m3u8索引文件会重写,以此确保分片地址都为相对地址
  168. downloader = AliyunVodDownloader()
  169. m3u8LocalDir = downloader.getSaveLocalDir() + '/' + AliyunVodUtils.getStringMd5(uploadVideoRequest.fileName)
  170. downloader.setSaveLocalDir(m3u8LocalDir)
  171. m3u8LocalPath = m3u8LocalDir + '/' + os.path.basename(uploadVideoRequest.fileName)
  172. self.__rewriteM3u8File(uploadVideoRequest.filePath, m3u8LocalPath, False)
  173. sliceList = []
  174. for sliceFileUrl in sliceFileUrls:
  175. tempFilePath, sliceFileName = AliyunVodUtils.getFileBriefPath(sliceFileUrl)
  176. err, sliceLocalPath = downloader.downloadFile(sliceFileUrl, sliceFileName)
  177. if sliceLocalPath is None:
  178. raise AliyunVodException('FileDownloadError', 'Download M3u8 File Error', '')
  179. sliceList.append((sliceLocalPath, sliceFileName))
  180. # 获取上传凭证
  181. uploadVideoRequest.setFilePath(m3u8LocalPath)
  182. uploadInfo = self.__createUploadVideo(uploadVideoRequest)
  183. uploadAddress = uploadInfo['UploadAddress']
  184. headers = self.__getUploadHeaders(uploadVideoRequest)
  185. # 依次上传分片文件
  186. for sliceFile in sliceList:
  187. self.__uploadOssObjectWithRetry(sliceFile[0], uploadAddress['ObjectPrefix'] + sliceFile[1], uploadInfo,
  188. headers)
  189. # 上传m3u8文件
  190. self.__uploadOssObjectWithRetry(m3u8LocalPath, uploadAddress['FileName'], uploadInfo, headers)
  191. # 删除下载到本地的m3u8文件和分片文件
  192. if os.path.exists(m3u8LocalPath):
  193. os.remove(m3u8LocalPath)
  194. for sliceFile in sliceList:
  195. if os.path.exists(sliceFile[0]):
  196. os.remove(sliceFile[0])
  197. if not os.listdir(m3u8LocalDir):
  198. os.rmdir(m3u8LocalDir)
  199. return uploadInfo['VideoId']
  200. @catch_error
  201. def uploadImage(self, uploadImageRequest, isLocalFile=True):
  202. """
  203. 上传图片文件到点播,不支持断点续传;该接口可支持上传本地图片或网络图片
  204. :param uploadImageRequest: UploadImageRequest,注意filePath为本地文件的绝对路径或网络文件的URL地址
  205. :param isLocalFile: bool, 是否为本地文件。True:本地文件,False:网络文件
  206. :return
  207. """
  208. # 网络图片需要先下载到本地
  209. if not isLocalFile:
  210. uploadImageRequest = self.__downloadWebMedia(uploadImageRequest)
  211. # 上传到点播
  212. uploadInfo = self.__createUploadImage(uploadImageRequest)
  213. self.__uploadOssObject(uploadImageRequest.filePath, uploadInfo['UploadAddress']['FileName'], uploadInfo, None)
  214. # 删除本地临时文件
  215. if not isLocalFile:
  216. os.remove(uploadImageRequest.filePath)
  217. return uploadInfo['ImageId'], uploadInfo['ImageURL']
  218. @catch_error
  219. def uploadAttachedMedia(self, uploadAttachedRequest, isLocalFile=True):
  220. """
  221. 上传辅助媒资文件(如水印、字幕文件)到点播,不支持断点续传;该接口可支持上传本地或网络文件
  222. :param uploadAttachedRequest: UploadAttachedMediaRequest,注意filePath为本地文件的绝对路径或网络文件的URL地址
  223. :param isLocalFile: bool, 是否为本地文件。True:本地文件,False:网络文件
  224. :return
  225. """
  226. # 网络文件需要先下载到本地
  227. if not isLocalFile:
  228. uploadAttachedRequest = self.__downloadWebMedia(uploadAttachedRequest)
  229. # 上传到点播
  230. uploadInfo = self.__createUploadAttachedMedia(uploadAttachedRequest)
  231. self.__uploadOssObject(uploadAttachedRequest.filePath, uploadInfo['UploadAddress']['FileName'], uploadInfo,
  232. None)
  233. # 删除本地临时文件
  234. if not isLocalFile:
  235. os.remove(uploadAttachedRequest.filePath)
  236. result = {'MediaId': uploadInfo['MediaId'], 'MediaURL': uploadInfo['MediaURL'],
  237. 'FileURL': uploadInfo['FileURL']}
  238. return result
  239. @catch_error
  240. def parseWebM3u8(self, m3u8FileUrl):
  241. """
  242. 解析网络m3u8文件得到所有分片文件地址,原理是将m3u8地址前缀拼接ts分片名称作为后者的下载url,适用于url不带签名或分片与m3u8文件签名相同的情况
  243. 本函数解析时会默认分片文件和m3u8文件位于同一目录,如不是则请自行拼接分片文件的地址列表
  244. :param m3u8FileUrl: string, m3u8网络文件url,例如:http://host/sample.m3u8
  245. :return sliceFileUrls
  246. """
  247. sliceFileUrls = []
  248. res = requests.get(m3u8FileUrl)
  249. res.raise_for_status()
  250. for line in res.iter_lines():
  251. if line.startswith('#'):
  252. continue
  253. sliceFileUrl = AliyunVodUtils.replaceFileName(m3u8FileUrl, line.strip())
  254. sliceFileUrls.append(sliceFileUrl)
  255. return sliceFileUrls
  256. @catch_error
  257. def parseLocalM3u8(self, m3u8FilePath):
  258. """
  259. 解析本地m3u8文件得到所有分片文件地址,原理是将m3u8地址前缀拼接ts分片名称作为后者的本地路径
  260. 本函数解析时会默认分片文件和m3u8文件位于同一目录,如不是则请自行拼接分片文件的地址列表
  261. :param m3u8FilePath: string, m3u8本地文件路径,例如:/opt/videos/sample.m3u8
  262. :return sliceFilePaths
  263. """
  264. sliceFilePaths = []
  265. m3u8FilePath = AliyunVodUtils.toUnicode(m3u8FilePath)
  266. for line in open(m3u8FilePath):
  267. if line.startswith('#'):
  268. continue
  269. sliceFileName = line.strip()
  270. sliceFilePath = AliyunVodUtils.replaceFileName(m3u8FilePath, sliceFileName)
  271. sliceFilePaths.append(sliceFilePath)
  272. return sliceFilePaths
  273. # 定义进度条回调函数;consumedBytes: 已经上传的数据量,totalBytes:总数据量
  274. def uploadProgressCallback(self, consumedBytes, totalBytes):
  275. try:
  276. if totalBytes:
  277. rate = round(float(float(consumedBytes) / float(totalBytes)), 4)
  278. else:
  279. rate = 0
  280. logger.info('视频上传中: {} bytes, percent:{}', consumedBytes, rate)
  281. except Exception as e:
  282. logger.exception("打印视频上传进度回调方法异常: {}", e)
  283. # print("[%s]uploaded %s bytes, percent %s%s" % (
  284. # AliyunVodUtils.getCurrentTimeStr(), consumedBytes, format(rate), '%'))
  285. # sys.stdout.flush()
  286. def __downloadWebMedia(self, request):
  287. # 下载媒体文件到本地临时目录
  288. downloader = AliyunVodDownloader()
  289. localFileName = "%s.%s" % (AliyunVodUtils.getStringMd5(request.fileName), request.mediaExt)
  290. fileUrl = request.filePath
  291. err, localFilePath = downloader.downloadFile(fileUrl, localFileName)
  292. if err < 0:
  293. raise AliyunVodException('FileDownloadError', 'Download File Error', '')
  294. # 重新设置上传请求对象
  295. request.setFilePath(localFilePath)
  296. return request
  297. def __rewriteM3u8File(self, srcM3u8File, dstM3u8File, isSrcLocal=True):
  298. newM3u8Text = ''
  299. if isSrcLocal:
  300. for line in open(AliyunVodUtils.toUnicode(srcM3u8File)):
  301. item = self.__processM3u8Line(line)
  302. if item is not None:
  303. newM3u8Text += item + "\n"
  304. else:
  305. res = requests.get(srcM3u8File)
  306. res.raise_for_status()
  307. for line in res.iter_lines():
  308. item = self.__processM3u8Line(line)
  309. if item is not None:
  310. newM3u8Text += item + "\n"
  311. AliyunVodUtils.mkDir(dstM3u8File)
  312. with open(dstM3u8File, 'w') as f:
  313. f.write(newM3u8Text)
  314. def __processM3u8Line(self, line):
  315. item = line.strip()
  316. if len(item) <= 0:
  317. return None
  318. if item.startswith('#'):
  319. return item
  320. tempFilePath, fileName = AliyunVodUtils.getFileBriefPath(item)
  321. return fileName
  322. def __requestUploadInfo(self, request, mediaType):
  323. request.set_accept_format('JSON')
  324. result = json.loads(self.__vodClient.do_action_with_exception(request).decode('utf-8'))
  325. result['OriUploadAddress'] = result['UploadAddress']
  326. result['OriUploadAuth'] = result['UploadAuth']
  327. result['UploadAddress'] = json.loads(base64.b64decode(result['OriUploadAddress']).decode('utf-8'))
  328. result['UploadAuth'] = json.loads(base64.b64decode(result['OriUploadAuth']).decode('utf-8'))
  329. result['MediaType'] = mediaType
  330. if mediaType == 'video':
  331. result['MediaId'] = result['VideoId']
  332. elif mediaType == 'image':
  333. result['MediaId'] = result['ImageId']
  334. result['MediaURL'] = result['ImageURL']
  335. return result
  336. # 获取视频上传地址和凭证
  337. def __createUploadVideo(self, uploadVideoRequest):
  338. request = CreateUploadVideoRequest.CreateUploadVideoRequest()
  339. title = AliyunVodUtils.subString(uploadVideoRequest.title, VOD_MAX_TITLE_LENGTH)
  340. request.set_Title(title)
  341. request.set_FileName(uploadVideoRequest.fileName)
  342. if uploadVideoRequest.description:
  343. description = AliyunVodUtils.subString(uploadVideoRequest.description, VOD_MAX_DESCRIPTION_LENGTH)
  344. request.set_Description(description)
  345. if uploadVideoRequest.coverURL:
  346. request.set_CoverURL(uploadVideoRequest.coverURL)
  347. if uploadVideoRequest.tags:
  348. request.set_Tags(uploadVideoRequest.tags)
  349. if uploadVideoRequest.cateId:
  350. request.set_CateId(uploadVideoRequest.cateId)
  351. if uploadVideoRequest.templateGroupId:
  352. request.set_TemplateGroupId(uploadVideoRequest.templateGroupId)
  353. if uploadVideoRequest.storageLocation:
  354. request.set_StorageLocation(uploadVideoRequest.storageLocation)
  355. if uploadVideoRequest.userData:
  356. request.set_UserData(uploadVideoRequest.userData)
  357. if uploadVideoRequest.appId:
  358. request.set_AppId(uploadVideoRequest.appId)
  359. if uploadVideoRequest.workflowId:
  360. request.set_WorkflowId(uploadVideoRequest.workflowId)
  361. # 根据request发送请求阿里云
  362. result = self.__requestUploadInfo(request, 'video')
  363. logger.info("CreateUploadVideo, FilePath: {}, VideoId: {}", uploadVideoRequest.filePath,
  364. result['VideoId'])
  365. return result
  366. # 刷新上传凭证
  367. def __refresh_upload_video(self, videoId):
  368. request = RefreshUploadVideoRequest.RefreshUploadVideoRequest();
  369. request.set_VideoId(videoId)
  370. result = self.__requestUploadInfo(request, 'video')
  371. logger.info("RefreshUploadVideo, VideoId:{}", result['VideoId'])
  372. return result
  373. # 获取图片上传地址和凭证
  374. def __createUploadImage(self, uploadImageRequest):
  375. request = CreateUploadImageRequest.CreateUploadImageRequest()
  376. request.set_ImageType(uploadImageRequest.imageType)
  377. request.set_ImageExt(uploadImageRequest.imageExt)
  378. if uploadImageRequest.title:
  379. title = AliyunVodUtils.subString(uploadImageRequest.title, VOD_MAX_TITLE_LENGTH)
  380. request.set_Title(title)
  381. if uploadImageRequest.description:
  382. description = AliyunVodUtils.subString(uploadImageRequest.description, VOD_MAX_DESCRIPTION_LENGTH)
  383. request.set_Description(description)
  384. if uploadImageRequest.tags:
  385. request.set_Tags(uploadImageRequest.tags)
  386. if uploadImageRequest.cateId:
  387. request.set_CateId(uploadImageRequest.cateId)
  388. if uploadImageRequest.storageLocation:
  389. request.set_StorageLocation(uploadImageRequest.storageLocation)
  390. if uploadImageRequest.userData:
  391. request.set_UserData(uploadImageRequest.userData)
  392. if uploadImageRequest.appId:
  393. request.set_AppId(uploadImageRequest.appId)
  394. if uploadImageRequest.workflowId:
  395. request.set_WorkflowId(uploadImageRequest.workflowId)
  396. result = self.__requestUploadInfo(request, 'image')
  397. logger.info("CreateUploadImage, FilePath: %s, ImageId: %s, ImageUrl: %s" % (
  398. uploadImageRequest.filePath, result['ImageId'], result['ImageURL']))
  399. return result
  400. def __createUploadAttachedMedia(self, uploadAttachedRequest):
  401. request = CreateUploadAttachedMediaRequest.CreateUploadAttachedMediaRequest()
  402. request.set_BusinessType(uploadAttachedRequest.businessType)
  403. request.set_MediaExt(uploadAttachedRequest.mediaExt)
  404. if uploadAttachedRequest.title:
  405. title = AliyunVodUtils.subString(uploadAttachedRequest.title, VOD_MAX_TITLE_LENGTH)
  406. request.set_Title(title)
  407. if uploadAttachedRequest.description:
  408. description = AliyunVodUtils.subString(uploadAttachedRequest.description, VOD_MAX_DESCRIPTION_LENGTH)
  409. request.set_Description(description)
  410. if uploadAttachedRequest.tags:
  411. request.set_Tags(uploadAttachedRequest.tags)
  412. if uploadAttachedRequest.cateId:
  413. request.set_CateId(uploadAttachedRequest.cateId)
  414. if uploadAttachedRequest.storageLocation:
  415. request.set_StorageLocation(uploadAttachedRequest.storageLocation)
  416. if uploadAttachedRequest.userData:
  417. request.set_UserData(uploadAttachedRequest.userData)
  418. if uploadAttachedRequest.appId:
  419. request.set_AppId(uploadAttachedRequest.appId)
  420. if uploadAttachedRequest.workflowId:
  421. request.set_WorkflowId(uploadAttachedRequest.workflowId)
  422. result = self.__requestUploadInfo(request, 'attached')
  423. logger.info("CreateUploadImage, FilePath: %s, MediaId: %s, MediaURL: %s" % (
  424. uploadAttachedRequest.filePath, result['MediaId'], result['MediaURL']))
  425. return result
  426. def __getUploadHeaders(self, uploadVideoRequest):
  427. if uploadVideoRequest.isShowWatermark is None:
  428. return None
  429. else:
  430. userData = "{\"Vod\":{\"UserData\":{\"IsShowWaterMark\": \"%s\"}}}" % (uploadVideoRequest.isShowWatermark)
  431. return {'x-oss-notification': base64.b64encode(userData, 'utf-8')}
  432. # uploadType,可选:multipart, put, web
  433. def __uploadOssObjectWithRetry(self, filePath, object, uploadInfo, headers=None):
  434. retryTimes = 0
  435. while retryTimes < self.__maxRetryTimes:
  436. try:
  437. return self.__uploadOssObject(filePath, object, uploadInfo, headers)
  438. except OssError as e:
  439. # 上传凭证过期需要重新获取凭证
  440. if e.code == 'SecurityTokenExpired' or e.code == 'InvalidAccessKeyId':
  441. uploadInfo = self.__refresh_upload_video(uploadInfo['MediaId'])
  442. except Exception as e:
  443. raise e
  444. except:
  445. logger.error("上传vod异常: {}", format_exc())
  446. raise AliyunVodException('UnkownError', "-1", format_exc())
  447. finally:
  448. retryTimes += 1
  449. else:
  450. raise Exception("重试超过限制")
  451. def __uploadOssObject(self, filePath, object, uploadInfo, headers=None):
  452. self.__createOssClient(uploadInfo['UploadAuth'], uploadInfo['UploadAddress'])
  453. """
  454. p = os.path.dirname(os.path.realpath(__file__))
  455. store = os.path.dirname(p) + '/osstmp'
  456. return oss2.resumable_upload(self.__bucketClient, object, filePath,
  457. store=oss2.ResumableStore(root=store), headers=headers,
  458. multipart_threshold=self.__multipartThreshold, part_size=self.__multipartPartSize,
  459. num_threads=self.__multipartThreadsNum, progress_callback=self.uploadProgressCallback)
  460. """
  461. if self.__progress_callback is None:
  462. self.__progress_callback = self.uploadProgressCallback
  463. self.uploader = _VodResumableUploader(self.__bucketClient, filePath, object, uploadInfo, headers,
  464. self.__progress_callback, self.__refreshUploadAuth)
  465. self.uploader.setMultipartInfo(self.__multipartThreshold, self.__multipartPartSize, self.__multipartThreadsNum)
  466. self.uploader.setClientId(self.__accessKeyId)
  467. res = self.uploader.upload()
  468. uploadAddress = uploadInfo['UploadAddress']
  469. bucketHost = uploadAddress['Endpoint'].replace('://', '://' + uploadAddress['Bucket'] + ".")
  470. logger.info("UploadFile {} Finish, MediaId: {}, FilePath: {}, Destination: {}/{}",
  471. uploadInfo['MediaType'], uploadInfo['MediaId'], filePath, bucketHost, object)
  472. return res
  473. # 使用上传凭证和地址信息初始化OSS客户端(注意需要先Base64解码并Json Decode再传入)
  474. # 如果上传的ECS位于点播相同的存储区域(如上海),则可以指定internal为True,通过内网上传更快且免费
  475. def __createOssClient(self, uploadAuth, uploadAddress):
  476. auth = oss2.StsAuth(uploadAuth['AccessKeyId'], uploadAuth['AccessKeySecret'], uploadAuth['SecurityToken'])
  477. endpoint = AliyunVodUtils.convertOssInternal(uploadAddress['Endpoint'], self.__ecsRegion)
  478. self.__bucketClient = oss2.Bucket(auth, endpoint, uploadAddress['Bucket'],
  479. connect_timeout=self.__connTimeout, enable_crc=self.__EnableCrc)
  480. return self.__bucketClient
  481. def __refreshUploadAuth(self, videoId):
  482. uploadInfo = self.__refresh_upload_video(videoId)
  483. uploadAuth = uploadInfo['UploadAuth']
  484. uploadAddress = uploadInfo['UploadAddress']
  485. return self.__createOssClient(uploadAuth, uploadAddress)
  486. from oss2 import SizedFileAdapter, determine_part_size
  487. from oss2.models import PartInfo
  488. from aliyunsdkcore.utils import parameter_helper as helper
  489. class _VodResumableUploader:
  490. def __init__(self, bucket, filePath, object, uploadInfo, headers, progressCallback, refreshAuthCallback):
  491. self.__bucket = bucket
  492. self.__filePath = filePath
  493. self.__object = object
  494. self.__uploadInfo = uploadInfo
  495. self.__totalSize = None
  496. self.__headers = headers
  497. self.__mtime = os.path.getmtime(filePath)
  498. self.__progressCallback = progressCallback
  499. self.__refreshAuthCallback = refreshAuthCallback
  500. self.__threshold = None
  501. self.__partSize = None
  502. self.__threadsNum = None
  503. self.__uploadId = 0
  504. self.__record = {}
  505. self.__finishedSize = 0
  506. self.__finishedParts = []
  507. self.__filePartHash = None
  508. self.__clientId = None
  509. self.status = False
  510. def setMultipartInfo(self, threshold, partSize, threadsNum):
  511. self.__threshold = threshold
  512. self.__partSize = partSize
  513. self.__threadsNum = threadsNum
  514. def setClientId(self, clientId):
  515. self.__clientId = clientId
  516. def upload(self):
  517. self.__totalSize = os.path.getsize(self.__filePath)
  518. logger.info("上传视频路径: {}, 视频大小: {}", self.__filePath, self.__totalSize)
  519. if self.__threshold and self.__totalSize <= self.__threshold:
  520. return self.simpleUpload()
  521. else:
  522. return self.multipartUpload()
  523. def simpleUpload(self):
  524. with open(AliyunVodUtils.toUnicode(self.__filePath), 'rb') as f:
  525. result = self.__bucket.put_object(self.__object, f, headers=self.__headers, progress_callback=None)
  526. if self.__uploadInfo['MediaType'] == 'video':
  527. self.__reportUploadProgress('put', 1, self.__totalSize)
  528. return result
  529. def multipartUpload(self):
  530. psize = oss2.determine_part_size(self.__totalSize, preferred_size=self.__partSize)
  531. # 初始化分片
  532. self.__uploadId = self.__bucket.init_multipart_upload(self.__object).upload_id
  533. startTime = time.time()
  534. expireSeconds = 2500 # 上传凭证有效期3000秒,提前刷新
  535. # 逐个上传分片
  536. with open(AliyunVodUtils.toUnicode(self.__filePath), 'rb') as fileObj:
  537. partNumber = 1
  538. offset = 0
  539. while offset < self.__totalSize:
  540. if self.status:
  541. return None
  542. uploadSize = min(psize, self.__totalSize - offset)
  543. # logger.info("UploadPart, FilePath: %s, VideoId: %s, UploadId: %s, PartNumber: %s, PartSize: %s" % (self.__fileName, self.__videoId, self.__uploadId, partNumber, uploadSize))
  544. result = self.__upload_part(partNumber, fileObj, uploadSize)
  545. # print(result.request_id)
  546. self.__finishedParts.append(PartInfo(partNumber, result.etag))
  547. offset += uploadSize
  548. partNumber += 1
  549. # 上传进度回调
  550. self.__progressCallback(offset, self.__totalSize)
  551. if self.__uploadInfo['MediaType'] == 'video':
  552. # 上报上传进度
  553. self.__reportUploadProgress('multipart', partNumber - 1, offset)
  554. # 检测上传凭证是否过期
  555. nowTime = time.time()
  556. if nowTime - startTime >= expireSeconds:
  557. self.__bucket = self.__refreshAuthCallback(self.__uploadInfo['MediaId'])
  558. startTime = nowTime
  559. # 完成分片上传
  560. self.__complete_multipart_upload()
  561. return result
  562. def __upload_part(self, partNumber, fileObj, uploadSize):
  563. retry_num = 0
  564. while True:
  565. try:
  566. return self.__bucket.upload_part(self.__object, self.__uploadId, partNumber,
  567. SizedFileAdapter(fileObj, uploadSize))
  568. except:
  569. logger.error("阿里云完成分片上传异常报错: {}, 当前重试次数:{}", format_exc(), retry_num + 1)
  570. if retry_num > 3:
  571. raise Exception("阿里云分片上传异常")
  572. finally:
  573. retry_num += 1
  574. time.sleep(1)
  575. def __complete_multipart_upload(self):
  576. retry_num = 0
  577. while True:
  578. try:
  579. self.__bucket.complete_multipart_upload(self.__object, self.__uploadId, self.__finishedParts,
  580. headers=self.__headers)
  581. break
  582. except Exception as e:
  583. logger.error("阿里云完成分片上传异常报错: {}, 当前重试次数:{}", str(e), retry_num + 1)
  584. if retry_num > 5:
  585. raise Exception("阿里云完成分片上传异常")
  586. except:
  587. logger.error("阿里云完成分片上传异常报错, 当前重试次数:{}", retry_num + 1)
  588. if retry_num > 5:
  589. raise Exception("阿里云完成分片上传异常")
  590. finally:
  591. time.sleep(1)
  592. retry_num += 1
  593. def __reportUploadProgress(self, uploadMethod, donePartsCount, doneBytes):
  594. retry_num = 5
  595. current_num = 0
  596. while True:
  597. try:
  598. reportHost = 'vod.cn-shanghai.aliyuncs.com'
  599. sdkVersion = '1.3.1'
  600. reportKey = 'HBL9nnSwhtU2$STX'
  601. uploadPoint = {'upMethod': uploadMethod, 'partSize': self.__partSize, 'doneBytes': doneBytes}
  602. timestamp = int(time.time())
  603. authInfo = AliyunVodUtils.getStringMd5("%s|%s|%s" % (self.__clientId, reportKey, timestamp))
  604. fields = {'Action': 'ReportUploadProgress', 'Format': 'JSON', 'Version': '2017-03-21',
  605. 'Timestamp': helper.get_iso_8061_date(), 'SignatureNonce': helper.get_uuid(),
  606. 'VideoId': self.__uploadInfo['MediaId'], 'Source': 'PythonSDK', 'ClientId': self.__clientId,
  607. 'BusinessType': 'UploadVideo', 'TerminalType': 'PC', 'DeviceModel': 'Server',
  608. 'AppVersion': sdkVersion, 'AuthTimestamp': timestamp, 'AuthInfo': authInfo,
  609. 'FileName': self.__filePath,
  610. 'FileHash': self.__getFilePartHash(self.__clientId, self.__filePath, self.__totalSize),
  611. 'FileSize': self.__totalSize, 'FileCreateTime': timestamp, 'UploadRatio': 0,
  612. 'UploadId': self.__uploadId,
  613. 'DonePartsCount': donePartsCount, 'PartSize': self.__partSize,
  614. 'UploadPoint': json.dumps(uploadPoint),
  615. 'UploadAddress': self.__uploadInfo['OriUploadAddress']
  616. }
  617. requests.post('http://' + reportHost, fields, timeout=30)
  618. break
  619. except Exception as e:
  620. current_num += 1
  621. time.sleep(1)
  622. logger.error("vod上报视频进度异常: {}, 当前重试次数:{}", repr(e), current_num)
  623. if current_num > retry_num:
  624. logger.error("vod上报视频重试失败 {}", repr(e))
  625. raise e
  626. def __getFilePartHash(self, clientId, filePath, fileSize):
  627. if self.__filePartHash:
  628. return self.__filePartHash
  629. length = 1 * 1024 * 1024
  630. if fileSize < length:
  631. length = fileSize
  632. try:
  633. fp = open(AliyunVodUtils.toUnicode(filePath), 'rb')
  634. strVal = fp.read(length)
  635. self.__filePartHash = AliyunVodUtils.getStringMd5(strVal, False)
  636. fp.close()
  637. except:
  638. self.__filePartHash = "%s|%s|%s" % (clientId, filePath, self.__mtime)
  639. return self.__filePartHash