You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
7.4KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from json import dumps, loads
  4. from traceback import format_exc
  5. from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
  6. from loguru import logger
  7. # 生产者
  8. class CustomerKafkaProducer:
  9. __slots__ = (
  10. '__configs',
  11. 'customerProducer',
  12. '__bootstrap_servers'
  13. )
  14. def __init__(self, kafka_config):
  15. self.__configs = kafka_config["producer"]
  16. self.__bootstrap_servers = kafka_config["bootstrap_servers"]
  17. self.customerProducer = None
  18. self.get_producer()
  19. # 获取kafka生产者
  20. def get_producer(self):
  21. if self.customerProducer is None:
  22. logger.info("配置kafka生产者!")
  23. self.customerProducer = KafkaProducer(
  24. bootstrap_servers=self.__bootstrap_servers,
  25. acks=self.__configs["acks"],
  26. retries=self.__configs["retries"],
  27. linger_ms=self.__configs["linger_ms"],
  28. retry_backoff_ms=self.__configs["retry_backoff_ms"],
  29. max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"],
  30. key_serializer=lambda m: dumps(m).encode("utf-8"),
  31. value_serializer=lambda m: dumps(m).encode("utf-8"))
  32. # mode 模式1:异步发送 2:同步发送
  33. # def on_send_success(record_metadata): 成功回调
  34. # def on_send_error(exc): 失败回调
  35. def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
  36. retry_send_num = 0
  37. while True:
  38. try:
  39. self.get_producer()
  40. logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
  41. message.get("request_id"))
  42. if mode == 1:
  43. if not customer_send_success:
  44. customer_send_success = CustomerKafkaProducer.on_send_success
  45. if not customer_send_error:
  46. customer_send_error = CustomerKafkaProducer.on_send_error
  47. self.customerProducer.send(topic=topic, key=key, value=message) \
  48. .add_callback(customer_send_success, message.get("request_id")) \
  49. .add_errback(customer_send_error, message.get("request_id"))
  50. if mode == 2:
  51. try:
  52. self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
  53. logger.info("kafka同步发送信息成功, requestId:{}", message.get("request_id"))
  54. except Exception as ke:
  55. logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(),
  56. message.get("request_id"))
  57. raise ke
  58. break
  59. except Exception as e:
  60. retry_send_num += 1
  61. logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
  62. message.get("request_id"))
  63. time.sleep(1)
  64. self.customerProducer = None
  65. if retry_send_num > 3:
  66. logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(),
  67. message.get("request_id"))
  68. raise e
  69. def close_producer(self):
  70. self.customerProducer.flush()
  71. self.customerProducer.close()
  72. logger.info("kafka生产者关闭完成!")
  73. @staticmethod
  74. def on_send_success(requestId, record_metadata):
  75. logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
  76. record_metadata.partition, record_metadata.offset, requestId)
  77. @staticmethod
  78. def on_send_error(requestId, exc):
  79. logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId)
  80. # 生产者
  81. class CustomerKafkaConsumer:
  82. __slots__ = ('__configs', 'customerConsumer', '__bootstrap_servers', '__topics')
  83. def __init__(self, kafka_config, topics=()):
  84. logger.info("初始化消费者")
  85. self.__configs = kafka_config["consumer"]
  86. self.__bootstrap_servers = kafka_config["bootstrap_servers"]
  87. self.customerConsumer = None
  88. self.__topics = topics
  89. self.subscribe()
  90. logger.info("初始化消费者完成")
  91. def subscribe(self):
  92. if self.customerConsumer is None:
  93. logger.info("获取消费者!")
  94. self.customerConsumer = KafkaConsumer(
  95. bootstrap_servers=self.__bootstrap_servers,
  96. # client_id=self.__configs[KAFKA_CLIENT_ID],
  97. group_id=self.__configs["group_id"],
  98. auto_offset_reset=self.__configs["auto_offset_reset"],
  99. enable_auto_commit=bool(self.__configs["enable_auto_commit"]),
  100. max_poll_records=self.__configs["max_poll_records"],
  101. value_deserializer=lambda m: loads(m.decode("utf-8")))
  102. logger.info("kafka生产者订阅topic:{}", self.__topics)
  103. # if self.topics is None or len(self.topics) == 0:
  104. # logger.error("消费者订阅topic不能为空!")
  105. # raise Exception("消费者订阅topic不能为空!")
  106. # # 手动配置分区
  107. # customer_partition = []
  108. # for topic in self.topics:
  109. # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
  110. # customer_partition.append(TopicPartition(topic, p))
  111. # self.customerConsumer.assign(customer_partition)
  112. # 自动配置
  113. self.customerConsumer.subscribe(topics=self.__topics)
  114. logger.info("kafka生产者订阅topic完成")
  115. def poll(self):
  116. msg = None
  117. try:
  118. self.subscribe()
  119. msg = self.customerConsumer.poll()
  120. except Exception:
  121. self.customerConsumer = None
  122. logger.error("消费者拉取消息异常: {}", format_exc())
  123. return msg
  124. def commit_offset(self, message, request_id):
  125. retry_num = 0
  126. topic = message.topic
  127. offset = message.offset + 1
  128. partition = message.partition
  129. while True:
  130. try:
  131. self.subscribe()
  132. logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
  133. request_id)
  134. tp = TopicPartition(topic=topic, partition=partition)
  135. self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
  136. logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
  137. request_id)
  138. break
  139. except Exception:
  140. self.customerConsumer = None
  141. logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id)
  142. time.sleep(1)
  143. retry_num += 1
  144. if retry_num > 3:
  145. logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id)
  146. break
  147. # if __name__=="__main__":
  148. # try:
  149. # 1/0
  150. # except Exception as e:
  151. # logger.exception("aaaaa:{} {}", e, "11111")