You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

305 lines
12KB

  1. # -*- coding: utf-8 -*-
  2. """
  3. oss2.iterators
  4. ~~~~~~~~~~~~~~
  5. 该模块包含了一些易于使用的迭代器,可以用来遍历Bucket、文件、分片上传等。
  6. """
  7. from .models import MultipartUploadInfo, SimplifiedObjectInfo
  8. from .exceptions import ServerError
  9. from . import defaults, http
  10. class _BaseIterator(object):
  11. def __init__(self, marker, max_retries):
  12. self.is_truncated = True
  13. self.next_marker = marker
  14. max_retries = defaults.get(max_retries, defaults.request_retries)
  15. self.max_retries = max_retries if max_retries > 0 else 1
  16. self.entries = []
  17. def _fetch(self):
  18. raise NotImplemented # pragma: no cover
  19. def __iter__(self):
  20. return self
  21. def __next__(self):
  22. while True:
  23. if self.entries:
  24. return self.entries.pop(0)
  25. if not self.is_truncated:
  26. raise StopIteration
  27. self.fetch_with_retry()
  28. def next(self):
  29. return self.__next__()
  30. def fetch_with_retry(self):
  31. for i in range(self.max_retries):
  32. try:
  33. self.is_truncated, self.next_marker = self._fetch()
  34. except ServerError as e:
  35. if e.status // 100 != 5:
  36. raise
  37. if i == self.max_retries - 1:
  38. raise
  39. else:
  40. return
  41. class BucketIterator(_BaseIterator):
  42. """遍历用户Bucket的迭代器。
  43. 每次迭代返回的是 :class:`SimplifiedBucketInfo <oss2.models.SimplifiedBucketInfo>` 对象。
  44. :param service: :class:`Service <oss2.Service>` 对象
  45. :param prefix: 只列举匹配该前缀的Bucket
  46. :param marker: 分页符。只列举Bucket名字典序在此之后的Bucket
  47. :param max_keys: 每次调用 `list_buckets` 时的max_keys参数。注意迭代器返回的数目可能会大于该值。
  48. """
  49. def __init__(self, service, prefix='', marker='', max_keys=100, max_retries=None):
  50. super(BucketIterator, self).__init__(marker, max_retries)
  51. self.service = service
  52. self.prefix = prefix
  53. self.max_keys = max_keys
  54. def _fetch(self):
  55. result = self.service.list_buckets(prefix=self.prefix,
  56. marker=self.next_marker,
  57. max_keys=self.max_keys)
  58. self.entries = result.buckets
  59. return result.is_truncated, result.next_marker
  60. class ObjectIterator(_BaseIterator):
  61. """遍历Bucket里文件的迭代器。
  62. 每次迭代返回的是 :class:`SimplifiedObjectInfo <oss2.models.SimplifiedObjectInfo>` 对象。
  63. 当 `SimplifiedObjectInfo.is_prefix()` 返回True时,表明是公共前缀(目录)。
  64. :param bucket: :class:`Bucket <oss2.Bucket>` 对象
  65. :param prefix: 只列举匹配该前缀的文件
  66. :param delimiter: 目录分隔符
  67. :param marker: 分页符
  68. :param max_keys: 每次调用 `list_objects` 时的max_keys参数。注意迭代器返回的数目可能会大于该值。
  69. :param headers: HTTP头部
  70. :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
  71. """
  72. def __init__(self, bucket, prefix='', delimiter='', marker='', max_keys=100, max_retries=None, headers=None):
  73. super(ObjectIterator, self).__init__(marker, max_retries)
  74. self.bucket = bucket
  75. self.prefix = prefix
  76. self.delimiter = delimiter
  77. self.max_keys = max_keys
  78. self.headers = http.CaseInsensitiveDict(headers)
  79. def _fetch(self):
  80. result = self.bucket.list_objects(prefix=self.prefix,
  81. delimiter=self.delimiter,
  82. marker=self.next_marker,
  83. max_keys=self.max_keys,
  84. headers=self.headers)
  85. self.entries = result.object_list + [SimplifiedObjectInfo(prefix, None, None, None, None, None)
  86. for prefix in result.prefix_list]
  87. self.entries.sort(key=lambda obj: obj.key)
  88. return result.is_truncated, result.next_marker
  89. class ObjectIteratorV2(_BaseIterator):
  90. """遍历Bucket里文件的迭代器。
  91. 每次迭代返回的是 :class:`SimplifiedObjectInfo <oss2.models.SimplifiedObjectInfo>` 对象。
  92. 当 `SimplifiedObjectInfo.is_prefix()` 返回True时,表明是公共前缀(目录)。
  93. :param str prefix: 只罗列文件名为该前缀的文件
  94. :param str delimiter: 分隔符。可以用来模拟目录
  95. :param str continuation_token: 分页标志。首次调用传空串,后续使用返回值的next_continuation_token
  96. :param str start_after: 起始文件名称,OSS会按照文件的字典序排列返回start_after之后的文件。
  97. :param bool fetch_owner: 是否获取文件的owner信息,默认不返回。
  98. :param int max_keys: 最多返回文件的个数,文件和目录的和不能超过该值
  99. :param headers: HTTP头部
  100. :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
  101. """
  102. def __init__(self, bucket, prefix='', delimiter='', continuation_token='', start_after='', fetch_owner = False, encoding_type = 'url', max_keys=100, max_retries=None, headers=None):
  103. super(ObjectIteratorV2, self).__init__(continuation_token, max_retries)
  104. self.bucket = bucket
  105. self.prefix = prefix
  106. self.delimiter = delimiter
  107. self.start_after = start_after
  108. self.fetch_owner = fetch_owner
  109. self.encoding_type = encoding_type
  110. self.max_keys = max_keys
  111. self.headers = http.CaseInsensitiveDict(headers)
  112. def _fetch(self):
  113. result = self.bucket.list_objects_v2(prefix=self.prefix,
  114. delimiter=self.delimiter,
  115. continuation_token=self.next_marker,
  116. start_after=self.start_after,
  117. fetch_owner=self.fetch_owner,
  118. encoding_type=self.encoding_type,
  119. max_keys=self.max_keys,
  120. headers=self.headers)
  121. self.entries = result.object_list + [SimplifiedObjectInfo(prefix, None, None, None, None, None)
  122. for prefix in result.prefix_list]
  123. self.entries.sort(key=lambda obj: obj.key)
  124. return result.is_truncated, result.next_continuation_token
  125. class MultipartUploadIterator(_BaseIterator):
  126. """遍历Bucket里未完成的分片上传。
  127. 每次返回 :class:`MultipartUploadInfo <oss2.models.MultipartUploadInfo>` 对象。
  128. 当 `MultipartUploadInfo.is_prefix()` 返回True时,表明是公共前缀(目录)。
  129. :param bucket: :class:`Bucket <oss2.Bucket>` 对象
  130. :param prefix: 仅列举匹配该前缀的文件的分片上传
  131. :param delimiter: 目录分隔符
  132. :param key_marker: 文件名分页符
  133. :param upload_id_marker: 分片上传ID分页符
  134. :param max_uploads: 每次调用 `list_multipart_uploads` 时的max_uploads参数。注意迭代器返回的数目可能会大于该值。
  135. :param headers: HTTP头部
  136. :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
  137. """
  138. def __init__(self, bucket,
  139. prefix='', delimiter='', key_marker='', upload_id_marker='',
  140. max_uploads=1000, max_retries=None, headers=None):
  141. super(MultipartUploadIterator, self).__init__(key_marker, max_retries)
  142. self.bucket = bucket
  143. self.prefix = prefix
  144. self.delimiter = delimiter
  145. self.next_upload_id_marker = upload_id_marker
  146. self.max_uploads = max_uploads
  147. self.headers = http.CaseInsensitiveDict(headers)
  148. def _fetch(self):
  149. result = self.bucket.list_multipart_uploads(prefix=self.prefix,
  150. delimiter=self.delimiter,
  151. key_marker=self.next_marker,
  152. upload_id_marker=self.next_upload_id_marker,
  153. max_uploads=self.max_uploads,
  154. headers=self.headers)
  155. self.entries = result.upload_list + [MultipartUploadInfo(prefix, None, None) for prefix in result.prefix_list]
  156. self.entries.sort(key=lambda u: u.key)
  157. self.next_upload_id_marker = result.next_upload_id_marker
  158. return result.is_truncated, result.next_key_marker
  159. class ObjectUploadIterator(_BaseIterator):
  160. """遍历一个Object所有未完成的分片上传。
  161. 每次返回 :class:`MultipartUploadInfo <oss2.models.MultipartUploadInfo>` 对象。
  162. 当 `MultipartUploadInfo.is_prefix()` 返回True时,表明是公共前缀(目录)。
  163. :param bucket: :class:`Bucket <oss2.Bucket>` 对象
  164. :param key: 文件名
  165. :param max_uploads: 每次调用 `list_multipart_uploads` 时的max_uploads参数。注意迭代器返回的数目可能会大于该值。
  166. :param headers: HTTP头部
  167. :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
  168. """
  169. def __init__(self, bucket, key, max_uploads=1000, max_retries=None, headers=None):
  170. super(ObjectUploadIterator, self).__init__('', max_retries)
  171. self.bucket = bucket
  172. self.key = key
  173. self.next_upload_id_marker = ''
  174. self.max_uploads = max_uploads
  175. self.headers = http.CaseInsensitiveDict(headers)
  176. def _fetch(self):
  177. result = self.bucket.list_multipart_uploads(prefix=self.key,
  178. key_marker=self.next_marker,
  179. upload_id_marker=self.next_upload_id_marker,
  180. max_uploads=self.max_uploads,
  181. headers=self.headers)
  182. self.entries = [u for u in result.upload_list if u.key == self.key]
  183. self.next_upload_id_marker = result.next_upload_id_marker
  184. if not result.is_truncated or not self.entries:
  185. return False, result.next_key_marker
  186. if result.next_key_marker > self.key:
  187. return False, result.next_key_marker
  188. return result.is_truncated, result.next_key_marker
  189. class PartIterator(_BaseIterator):
  190. """遍历一个分片上传会话中已经上传的分片。
  191. 每次返回 :class:`PartInfo <oss2.models.PartInfo>` 对象。
  192. :param bucket: :class:`Bucket <oss2.Bucket>` 对象
  193. :param key: 文件名
  194. :param upload_id: 分片上传ID
  195. :param marker: 分页符
  196. :param max_parts: 每次调用 `list_parts` 时的max_parts参数。注意迭代器返回的数目可能会大于该值。
  197. :param headers: HTTP头部
  198. :type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
  199. """
  200. def __init__(self, bucket, key, upload_id,
  201. marker='0', max_parts=1000, max_retries=None, headers=None):
  202. super(PartIterator, self).__init__(marker, max_retries)
  203. self.bucket = bucket
  204. self.key = key
  205. self.upload_id = upload_id
  206. self.max_parts = max_parts
  207. self.headers = http.CaseInsensitiveDict(headers)
  208. def _fetch(self):
  209. result = self.bucket.list_parts(self.key, self.upload_id,
  210. marker=self.next_marker,
  211. max_parts=self.max_parts,
  212. headers=self.headers)
  213. self.entries = result.parts
  214. return result.is_truncated, result.next_marker
  215. class LiveChannelIterator(_BaseIterator):
  216. """遍历Bucket里文件的迭代器。
  217. 每次迭代返回的是 :class:`LiveChannelInfo <oss2.models.LiveChannelInfo>` 对象。
  218. :param bucket: :class:`Bucket <oss2.Bucket>` 对象
  219. :param prefix: 只列举匹配该前缀的文件
  220. :param marker: 分页符
  221. :param max_keys: 每次调用 `list_live_channel` 时的max_keys参数。注意迭代器返回的数目可能会大于该值。
  222. """
  223. def __init__(self, bucket, prefix='', marker='', max_keys=100, max_retries=None):
  224. super(LiveChannelIterator, self).__init__(marker, max_retries)
  225. self.bucket = bucket
  226. self.prefix = prefix
  227. self.max_keys = max_keys
  228. def _fetch(self):
  229. result = self.bucket.list_live_channel(prefix=self.prefix,
  230. marker=self.next_marker,
  231. max_keys=self.max_keys)
  232. self.entries = result.channels
  233. return result.is_truncated, result.next_marker