You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
7.7KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from json import dumps, loads
  4. from traceback import format_exc
  5. from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
  6. from loguru import logger
  7. # 生产者
  8. class CustomerKafkaProducer:
  9. __slots__ = (
  10. '__context',
  11. '__configs',
  12. 'customerProducer',
  13. '__bootstrap_servers'
  14. )
  15. def __init__(self, context):
  16. self.__context = context
  17. self.__configs = context["kafka"][context["dsp"]["active"]]["producer"]
  18. self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"]
  19. self.customerProducer = None
  20. self.get_producer()
  21. # 获取kafka生产者
  22. def get_producer(self):
  23. if self.customerProducer is None:
  24. logger.info("配置kafka生产者!")
  25. self.customerProducer = KafkaProducer(
  26. bootstrap_servers=self.__bootstrap_servers,
  27. acks=self.__configs["acks"],
  28. retries=self.__configs["retries"],
  29. linger_ms=self.__configs["linger_ms"],
  30. retry_backoff_ms=self.__configs["retry_backoff_ms"],
  31. max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"],
  32. key_serializer=lambda m: dumps(m).encode("utf-8"),
  33. value_serializer=lambda m: dumps(m).encode("utf-8"))
  34. # mode 模式1:异步发送 2:同步发送
  35. # def on_send_success(record_metadata): 成功回调
  36. # def on_send_error(exc): 失败回调
  37. def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
  38. retry_send_num = 0
  39. while True:
  40. try:
  41. self.get_producer()
  42. logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
  43. message.get("requestId"))
  44. if mode == 1:
  45. if not customer_send_success:
  46. customer_send_success = CustomerKafkaProducer.on_send_success
  47. if not customer_send_error:
  48. customer_send_error = CustomerKafkaProducer.on_send_error
  49. self.customerProducer.send(topic=topic, key=key, value=message) \
  50. .add_callback(customer_send_success, message.get("requestId")) \
  51. .add_errback(customer_send_error, message.get("requestId"))
  52. if mode == 2:
  53. try:
  54. self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
  55. logger.info("kafka同步发送信息成功, requestId:{}", message.get("requestId"))
  56. except Exception as ke:
  57. logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(),
  58. message.get("requestId"))
  59. raise ke
  60. break
  61. except Exception as e:
  62. retry_send_num += 1
  63. logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
  64. message.get("requestId"))
  65. time.sleep(1)
  66. self.customerProducer = None
  67. if retry_send_num > 3:
  68. logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(),
  69. message.get("requestId"))
  70. raise e
  71. def close_producer(self):
  72. self.customerProducer.flush()
  73. self.customerProducer.close()
  74. logger.info("kafka生产者关闭完成!")
  75. @staticmethod
  76. def on_send_success(requestId, record_metadata):
  77. logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
  78. record_metadata.partition, record_metadata.offset, requestId)
  79. @staticmethod
  80. def on_send_error(requestId, exc):
  81. logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId)
  82. # 生产者
  83. class CustomerKafkaConsumer:
  84. __slots__ = (
  85. '__context',
  86. '__configs',
  87. 'customerConsumer',
  88. '__bootstrap_servers',
  89. '__topics'
  90. )
  91. def __init__(self, context, topics=()):
  92. logger.info("初始化消费者")
  93. self.__context = context
  94. self.__configs = context["kafka"][context["dsp"]["active"]]["consumer"]
  95. self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"]
  96. self.customerConsumer = None
  97. self.__topics = topics
  98. self.subscribe()
  99. logger.info("初始化消费者完成")
  100. def subscribe(self):
  101. if self.customerConsumer is None:
  102. logger.info("获取消费者!")
  103. self.customerConsumer = KafkaConsumer(
  104. bootstrap_servers=self.__bootstrap_servers,
  105. # client_id=self.__configs[KAFKA_CLIENT_ID],
  106. group_id=self.__configs["group_id"],
  107. auto_offset_reset=self.__configs["auto_offset_reset"],
  108. enable_auto_commit=bool(self.__configs["enable_auto_commit"]),
  109. max_poll_records=self.__configs["max_poll_records"],
  110. value_deserializer=lambda m: loads(m.decode("utf-8")))
  111. logger.info("kafka生产者订阅topic:{}", self.__topics)
  112. # if self.topics is None or len(self.topics) == 0:
  113. # logger.error("消费者订阅topic不能为空!")
  114. # raise Exception("消费者订阅topic不能为空!")
  115. # # 手动配置分区
  116. # customer_partition = []
  117. # for topic in self.topics:
  118. # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
  119. # customer_partition.append(TopicPartition(topic, p))
  120. # self.customerConsumer.assign(customer_partition)
  121. # 自动配置
  122. self.customerConsumer.subscribe(topics=self.__topics)
  123. logger.info("kafka生产者订阅topic完成")
  124. def poll(self):
  125. msg = None
  126. try:
  127. self.subscribe()
  128. msg = self.customerConsumer.poll()
  129. except Exception:
  130. self.customerConsumer = None
  131. logger.error("消费者拉取消息异常: {}", format_exc())
  132. return msg
  133. def commit_offset(self, message):
  134. retry_num = 0
  135. requestId = message.value.get('requestId')
  136. if not requestId:
  137. requestId = "1"
  138. topic = message.topic
  139. offset = message.offset + 1
  140. partition = message.partition
  141. while True:
  142. try:
  143. self.subscribe()
  144. logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
  145. requestId)
  146. tp = TopicPartition(topic=topic, partition=partition)
  147. self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
  148. logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
  149. requestId)
  150. break
  151. except Exception:
  152. self.customerConsumer = None
  153. logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, requestId)
  154. time.sleep(1)
  155. retry_num += 1
  156. if retry_num > 3:
  157. logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), requestId)
  158. break
  159. # if __name__=="__main__":
  160. # try:
  161. # 1/0
  162. # except Exception as e:
  163. # logger.exception("aaaaa:{} {}", e, "11111")