用kafka接收消息
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

326 lines
11KB

  1. # -*- coding: UTF-8 -*-
  2. import os,sys
  3. import hashlib
  4. import datetime
  5. import functools
  6. import logging
  7. from oss2.exceptions import OssError
  8. from aliyunsdkcore.acs_exception.exceptions import ServerException
  9. from aliyunsdkcore.acs_exception.exceptions import ClientException
  10. import traceback
  11. import requests
  12. if sys.version_info[0] == 3:
  13. import urllib.parse
  14. else:
  15. from urllib import unquote
  16. VOD_PRINT_INFO_LOG_SWITCH = 1
  17. class AliyunVodLog:
  18. """
  19. VOD日志类,基于logging实现
  20. """
  21. @staticmethod
  22. def printLogStr(msg, *args, **kwargs):
  23. if VOD_PRINT_INFO_LOG_SWITCH:
  24. print("[%s]%s" % (AliyunVodUtils.getCurrentTimeStr(), msg))
  25. @staticmethod
  26. def info(msg, *args, **kwargs):
  27. logging.info(msg, *args, **kwargs)
  28. AliyunVodLog.printLogStr(msg, *args, **kwargs)
  29. @staticmethod
  30. def error(msg, *args, **kwargs):
  31. logging.error(msg, *args, **kwargs)
  32. AliyunVodLog.printLogStr(msg, *args, **kwargs)
  33. @staticmethod
  34. def warning(msg, *args, **kwargs):
  35. logging.warning(msg, *args, **kwargs)
  36. AliyunVodLog.printLogStr(msg, *args, **kwargs)
  37. logger = AliyunVodLog
  38. class AliyunVodUtils:
  39. """
  40. VOD上传SDK的工具类,提供截取字符串、获取扩展名、获取文件名等静态函数
  41. """
  42. # 截取字符串,在不超过最大字节数前提下确保中文字符不被截断出现乱码(先转换成unicode,再取子串,然后转换成utf-8)
  43. @staticmethod
  44. def subString(strVal, maxBytes, charSet='utf-8'):
  45. i = maxBytes
  46. if sys.version_info[0] == 3:
  47. while len(strVal.encode(charSet)) > maxBytes:
  48. if i < 0:
  49. return ''
  50. strVal = strVal[:i]
  51. i -= 1
  52. else:
  53. while len(strVal) > maxBytes:
  54. if i < 0:
  55. return ''
  56. strVal = strVal.decode(charSet)[:i].encode(charSet)
  57. i -= 1
  58. return strVal
  59. @staticmethod
  60. def getFileExtension(fileName):
  61. end = fileName.rfind('?')
  62. if end <= 0:
  63. end = len(fileName)
  64. i = fileName.rfind('.')
  65. if i >= 0:
  66. return fileName[i+1:end].lower()
  67. else:
  68. return None
  69. # urldecode
  70. @staticmethod
  71. def urlDecode(fileUrl):
  72. if sys.version_info[0] == 3:
  73. return urllib.parse.unquote(fileUrl)
  74. else:
  75. return unquote(fileUrl)
  76. # urlencode
  77. @staticmethod
  78. def urlEncode(fileUrl):
  79. if sys.version_info[0] == 3:
  80. return urllib.parse.urlencode(fileUrl)
  81. else:
  82. return urllib.urlencode(fileUrl)
  83. # 获取Url的摘要地址(去除?后的参数,如果有)以及文件名
  84. @staticmethod
  85. def getFileBriefPath(fileUrl):
  86. #fileUrl = AliyunVodUtils.urlDecode(fileUrl)
  87. i = fileUrl.rfind('?')
  88. if i > 0:
  89. briefPath = fileUrl[:i]
  90. else:
  91. briefPath = fileUrl
  92. briefName = os.path.basename(briefPath)
  93. return briefPath, AliyunVodUtils.urlDecode(briefName)
  94. @staticmethod
  95. def getStringMd5(strVal, isEncode=True):
  96. m = hashlib.md5()
  97. m.update(strVal.encode('utf-8') if isEncode else strVal)
  98. return m.hexdigest()
  99. @staticmethod
  100. def getCurrentTimeStr():
  101. now = datetime.datetime.now()
  102. return now.strftime("%Y-%m-%d %H:%M:%S")
  103. # 将oss地址转换为内网地址(如果脚本部署的ecs与oss bucket在同一区域)
  104. @staticmethod
  105. def convertOssInternal(ossUrl, ecsRegion=None, isVpc=False):
  106. if (not ossUrl) or (not ecsRegion):
  107. return ossUrl
  108. availableRegions = ['cn-qingdao', 'cn-beijing', 'cn-zhangjiakou', 'cn-huhehaote', 'cn-hangzhou', 'cn-shanghai', 'cn-shenzhen',
  109. 'cn-hongkong', 'ap-southeast-1', 'ap-southeast-2', 'ap-southeast-3',
  110. 'ap-northeast-1', 'us-west-1', 'us-east-1', 'eu-central-1', 'me-east-1']
  111. if ecsRegion not in availableRegions:
  112. return ossUrl
  113. ossUrl = ossUrl.replace("https:", "http:")
  114. if isVpc:
  115. return ossUrl.replace("oss-%s.aliyuncs.com" % (ecsRegion), "vpc100-oss-%s.aliyuncs.com" % (ecsRegion))
  116. else:
  117. return ossUrl.replace("oss-%s.aliyuncs.com" % (ecsRegion), "oss-%s-internal.aliyuncs.com" % (ecsRegion))
  118. # 把输入转换为unicode
  119. @staticmethod
  120. def toUnicode(data):
  121. if isinstance(data, bytes):
  122. return data.decode('utf-8')
  123. else:
  124. return data
  125. # 替换路径中的文件名;考虑分隔符为"/" 或 "\"(windows)
  126. @staticmethod
  127. def replaceFileName(filePath, replace):
  128. if len(filePath) <= 0 or len(replace) <= 0:
  129. return filePath
  130. filePath = AliyunVodUtils.urlDecode(filePath)
  131. separator = '/'
  132. start = filePath.rfind(separator)
  133. if start < 0:
  134. separator = '\\'
  135. start = filePath.rfind(separator)
  136. if start < 0:
  137. return None
  138. result = "%s%s%s" % (filePath[0:start], separator, replace)
  139. return result
  140. # 创建文件中的目录
  141. @staticmethod
  142. def mkDir(filePath):
  143. if len(filePath) <= 0:
  144. return -1
  145. separator = '/'
  146. i = filePath.rfind(separator)
  147. if i < 0:
  148. separator = '\\'
  149. i = filePath.rfind(separator)
  150. if i < 0:
  151. return -2
  152. dirs = filePath[:i]
  153. if os.path.exists(dirs) and os.path.isdir(dirs):
  154. return 0
  155. os.makedirs(dirs)
  156. return 1
  157. class AliyunVodException(Exception):
  158. """
  159. VOD上传SDK的异常类,做统一的异常处理,外部捕获此异常即可
  160. """
  161. def __init__(self, type, code, msg, http_status=None, request_id=None):
  162. Exception.__init__(self)
  163. self.type = type or 'UnkownError'
  164. self.code = code
  165. self.message = msg
  166. self.http_status = http_status or 'NULL'
  167. self.request_id = request_id or 'NULL'
  168. def __str__(self):
  169. return "Type: %s, Code: %s, Message: %s, HTTPStatus: %s, RequestId: %s" % (
  170. self.type, self.code, self.message, str(self.http_status), self.request_id)
  171. def catch_error(method):
  172. """
  173. 装饰器,将内部异常转换成统一的异常类AliyunVodException
  174. """
  175. @functools.wraps(method)
  176. def wrapper(self, *args, **kwargs):
  177. try:
  178. return method(self, *args, **kwargs)
  179. except ServerException as e:
  180. # 可能原因:AK错误、账号无权限、参数错误等
  181. raise AliyunVodException('ServerException', e.get_error_code(), e.get_error_msg(), e.get_http_status(), e.get_request_id())
  182. logger.error("ServerException: %s", e)
  183. except ClientException as e:
  184. # 可能原因:本地网络故障(如不能连接外网)等
  185. raise AliyunVodException('ClientException', e.get_error_code(), e.get_error_msg())
  186. logger.error("ClientException: %s", e)
  187. except OssError as e:
  188. # 可能原因:上传凭证过期等
  189. raise AliyunVodException('OssError', e.code, e.message, e.status, e.request_id)
  190. logger.error("OssError: %s", e)
  191. except IOError as e:
  192. # 可能原因:文件URL不能访问、本地文件无法读取等
  193. raise AliyunVodException('IOError', repr(e), traceback.format_exc())
  194. logger.error("IOError: %s", traceback.format_exc())
  195. except OSError as e:
  196. # 可能原因:本地文件不存在等
  197. raise AliyunVodException('OSError', repr(e), traceback.format_exc())
  198. logger.error("OSError: %s", traceback.format_exc())
  199. except AliyunVodException as e:
  200. # 可能原因:参数错误
  201. raise e
  202. logger.error("VodException: %s", e)
  203. except Exception as e:
  204. raise AliyunVodException('UnkownException', repr(e), traceback.format_exc())
  205. logger.error("UnkownException: %s", traceback.format_exc())
  206. except:
  207. raise AliyunVodException('UnkownError', 'UnkownError', traceback.format_exc())
  208. logger.error("UnkownError: %s", traceback.format_exc())
  209. return wrapper
  210. class AliyunVodDownloader:
  211. """
  212. VOD网络文件的下载类,上传网络文件时会先下载到本地临时目录,再上传到点播
  213. """
  214. def __init__(self, localDir=None):
  215. if localDir:
  216. self.__localDir = localDir
  217. else:
  218. p = os.path.dirname(os.path.realpath(__file__))
  219. self.__localDir = os.path.dirname(p) + '/dlfiles'
  220. def setSaveLocalDir(self, localDir):
  221. self.__localDir = localDir
  222. def getSaveLocalDir(self):
  223. return self.__localDir
  224. def downloadFile(self, fileUrl, localFileName, fileSize=None):
  225. localPath = self.__localDir + '/' + localFileName
  226. logger.info("Download %s To %s" % (fileUrl, localPath))
  227. try:
  228. lsize = self.getFileSize(localPath)
  229. if fileSize and lsize == fileSize:
  230. logger.info('Download OK, File Exists')
  231. return 0, localPath
  232. AliyunVodUtils.mkDir(self.__localDir)
  233. err, webPage = self.__openWebFile(fileUrl, lsize)
  234. if err == 0:
  235. logger.info('Download OK, File Exists')
  236. webPage.close()
  237. return 0, localPath
  238. fileObj = open(localPath, 'ab+')
  239. for chunk in webPage.iter_content(chunk_size=8 * 1024):
  240. if chunk:
  241. fileObj.write(chunk)
  242. except Exception as e:
  243. logger.error("Download fail: %s" % (e))
  244. return -1, None
  245. fileObj.close()
  246. webPage.close()
  247. logger.info('Download OK')
  248. return 1, localPath
  249. def getFileSize(self, filePath):
  250. try:
  251. lsize = os.stat(filePath).st_size
  252. except:
  253. lsize = 0
  254. return lsize
  255. def __openWebFile(self, fileUrl, offset):
  256. webPage = None
  257. try:
  258. headers = {'Range': 'bytes=%d-' % offset}
  259. webPage = requests.get(fileUrl, stream=True, headers=headers, timeout=120, verify=False)
  260. status_code = webPage.status_code
  261. err = -1
  262. if status_code in [200, 206]:
  263. err = 1
  264. elif status_code == 416:
  265. err = 0
  266. else:
  267. logger.error("Download offset %s fail, invalid url, status: %s" % (offset, status_code))
  268. except Exception as e:
  269. logger.error("Download offset %s fail: %s" % (offset, e))
  270. err = -2
  271. finally:
  272. return err, webPage