You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
9.7KB

  1. import struct
  2. from loguru import logger
  3. from .exceptions import SelectOperationFailed
  4. from .exceptions import SelectOperationClientError
  5. from .exceptions import InconsistentError
  6. from . import utils
  7. """
  8. The adapter class for Select object's response.
  9. The response consists of frames. Each frame has the following format:
  10. Type | Payload Length | Header Checksum | Payload | Payload Checksum
  11. |<4-->| <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
  12. And we have three kind of frames.
  13. Data Frame:
  14. Type:8388609
  15. Payload: Offset | Data
  16. <-8 bytes>
  17. Continuous Frame
  18. Type:8388612
  19. Payload: Offset (8-bytes)
  20. End Frame
  21. Type:8388613
  22. Payload: Offset | total scanned bytes | http status code | error message
  23. <-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
  24. """
  25. class SelectResponseAdapter(object):
  26. _CHUNK_SIZE = 8 * 1024
  27. _CONTINIOUS_FRAME_TYPE=8388612
  28. _DATA_FRAME_TYPE = 8388609
  29. _END_FRAME_TYPE = 8388613
  30. _META_END_FRAME_TYPE = 8388614
  31. _JSON_META_END_FRAME_TYPE = 8388615
  32. _FRAMES_FOR_PROGRESS_UPDATE = 10
  33. def __init__(self, response, progress_callback = None, content_length = None, enable_crc = False):
  34. self.response = response
  35. self.frame_off_set = 0
  36. self.frame_length = 0
  37. self.frame_data = b''
  38. self.check_sum_flag = 0
  39. self.file_offset = 0
  40. self.finished = 0
  41. self.raw_buffer = b''
  42. self.raw_buffer_offset = 0
  43. #self.resp_content_iter = response.__iter__()
  44. self.callback = progress_callback
  45. self.frames_since_last_progress_report = 0
  46. self.content_length = content_length
  47. self.resp_content_iter = response.__iter__()
  48. self.enable_crc = enable_crc
  49. self.payload = b''
  50. self.output_raw_data = response.headers.get("x-oss-select-output-raw", '') == "true"
  51. self.request_id = response.headers.get("x-oss-request-id",'')
  52. self.splits = 0
  53. self.rows = 0
  54. self.columns = 0
  55. def read(self):
  56. if self.finished:
  57. return b''
  58. content=b''
  59. for data in self:
  60. content += data
  61. return content
  62. def __iter__(self):
  63. return self
  64. def __next__(self):
  65. return self.next()
  66. def next(self):
  67. if self.output_raw_data == True:
  68. data = next(self.resp_content_iter)
  69. if len(data) != 0:
  70. return data
  71. else: raise StopIteration
  72. while self.finished == 0:
  73. if self.frame_off_set < self.frame_length:
  74. data = self.frame_data[self.frame_off_set : self.frame_length]
  75. self.frame_length = self.frame_off_set = 0
  76. return data
  77. else:
  78. self.read_next_frame()
  79. self.frames_since_last_progress_report += 1
  80. if (self.frames_since_last_progress_report >= SelectResponseAdapter._FRAMES_FOR_PROGRESS_UPDATE and self.callback is not None):
  81. self.callback(self.file_offset, self.content_length)
  82. self.frames_since_last_progress_report = 0
  83. raise StopIteration
  84. def read_raw(self, amt):
  85. ret = b''
  86. read_count = 0
  87. while amt > 0 and self.finished == 0:
  88. size = len(self.raw_buffer)
  89. if size == 0:
  90. self.raw_buffer = next(self.resp_content_iter)
  91. self.raw_buffer_offset = 0
  92. size = len(self.raw_buffer)
  93. if size == 0:
  94. break
  95. if size - self.raw_buffer_offset >= amt:
  96. data = self.raw_buffer[self.raw_buffer_offset:self.raw_buffer_offset + amt]
  97. data_size = len(data)
  98. self.raw_buffer_offset += data_size
  99. ret += data
  100. read_count += data_size
  101. amt -= data_size
  102. else:
  103. data = self.raw_buffer[self.raw_buffer_offset:]
  104. data_len = len(data)
  105. ret += data
  106. read_count += data_len
  107. amt -= data_len
  108. self.raw_buffer = b''
  109. return ret
  110. def read_next_frame(self):
  111. frame_type = bytearray(self.read_raw(4))
  112. payload_length = bytearray(self.read_raw(4))
  113. utils.change_endianness_if_needed(payload_length) # convert to little endian
  114. payload_length_val = struct.unpack("I", bytes(payload_length))[0]
  115. header_checksum = bytearray(self.read_raw(4))
  116. frame_type[0] = 0 #mask the version bit
  117. utils.change_endianness_if_needed(frame_type) # convert to little endian
  118. frame_type_val = struct.unpack("I", bytes(frame_type))[0]
  119. if (frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and
  120. frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and
  121. frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and
  122. frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE and
  123. frame_type_val != SelectResponseAdapter._JSON_META_END_FRAME_TYPE):
  124. logger.warning("Unexpected frame type: {0}. RequestId:{1}. This could be due to the old version of client.".format(frame_type_val, self.request_id))
  125. raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))
  126. self.payload = self.read_raw(payload_length_val)
  127. file_offset_bytes = bytearray(self.payload[0:8])
  128. utils.change_endianness_if_needed(file_offset_bytes)
  129. self.file_offset = struct.unpack("Q", bytes(file_offset_bytes))[0]
  130. if frame_type_val == SelectResponseAdapter._DATA_FRAME_TYPE:
  131. self.frame_length = payload_length_val - 8
  132. self.frame_off_set = 0
  133. self.check_sum_flag=1
  134. self.frame_data = self.payload[8:]
  135. checksum = bytearray(self.read_raw(4)) #read checksum crc32
  136. utils.change_endianness_if_needed(checksum)
  137. checksum_val = struct.unpack("I", bytes(checksum))[0]
  138. if self.enable_crc:
  139. crc32 = utils.Crc32()
  140. crc32.update(self.payload)
  141. checksum_calc = crc32.crc
  142. if checksum_val != checksum_calc:
  143. logger.warning("Incorrect checksum: Actual {0} and calculated {1}. RequestId:{2}".format(checksum_val, checksum_calc, self.request_id))
  144. raise InconsistentError("Incorrect checksum: Actual" + str(checksum_val) + ". Calculated:" + str(checksum_calc), self.request_id)
  145. elif frame_type_val == SelectResponseAdapter._CONTINIOUS_FRAME_TYPE:
  146. self.frame_length = self.frame_off_set = 0
  147. self.check_sum_flag=1
  148. self.read_raw(4)
  149. elif frame_type_val == SelectResponseAdapter._END_FRAME_TYPE:
  150. self.frame_off_set = 0
  151. scanned_size_bytes = bytearray(self.payload[8:16])
  152. status_bytes = bytearray(self.payload[16:20])
  153. utils.change_endianness_if_needed(status_bytes)
  154. status = struct.unpack("I", bytes(status_bytes))[0]
  155. error_msg_size = payload_length_val - 20
  156. error_msg=b''
  157. error_code = b''
  158. if error_msg_size > 0:
  159. error_msg = self.payload[20:error_msg_size + 20]
  160. error_code_index = error_msg.find(b'.')
  161. if error_code_index >= 0 and error_code_index < error_msg_size - 1:
  162. error_code = error_msg[0:error_code_index]
  163. error_msg = error_msg[error_code_index + 1:]
  164. if status // 100 != 2:
  165. raise SelectOperationFailed(status, error_code, error_msg)
  166. self.frame_length = 0
  167. if self.callback is not None:
  168. self.callback(self.file_offset, self.content_length)
  169. self.read_raw(4) # read the payload checksum
  170. self.frame_length = 0
  171. self.finished = 1
  172. elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE or frame_type_val == SelectResponseAdapter._JSON_META_END_FRAME_TYPE:
  173. self.frame_off_set = 0
  174. scanned_size_bytes = bytearray(self.payload[8:16])
  175. status_bytes = bytearray(self.payload[16:20])
  176. utils.change_endianness_if_needed(status_bytes)
  177. status = struct.unpack("I", bytes(status_bytes))[0]
  178. splits_bytes = bytearray(self.payload[20:24])
  179. utils.change_endianness_if_needed(splits_bytes)
  180. self.splits = struct.unpack("I", bytes(splits_bytes))[0]
  181. lines_bytes = bytearray(self.payload[24:32])
  182. utils.change_endianness_if_needed(lines_bytes)
  183. self.rows = struct.unpack("Q", bytes(lines_bytes))[0]
  184. error_index = 36
  185. if frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
  186. column_bytes = bytearray(self.payload[32:36])
  187. utils.change_endianness_if_needed(column_bytes)
  188. self.columns = struct.unpack("I", bytes(column_bytes))[0]
  189. else:
  190. error_index = 32
  191. error_size = payload_length_val - error_index
  192. error_msg = b''
  193. error_code = b''
  194. if (error_size > 0):
  195. error_msg = self.payload[error_index:error_index + error_size]
  196. error_code_index = error_msg.find(b'.')
  197. if error_code_index >= 0 and error_code_index < error_size - 1:
  198. error_code = error_msg[0:error_code_index]
  199. error_msg = error_msg[error_code_index + 1:]
  200. self.read_raw(4) # read the payload checksum
  201. self.final_status = status
  202. self.frame_length = 0
  203. self.finished = 1
  204. if (status / 100 != 2):
  205. raise SelectOperationFailed(status, error_code, error_msg)