You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
7.3KB

  1. # -*- coding: utf-8 -*-
  2. import time
  3. from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
  4. from kafka.errors import kafka_errors
  5. import json
  6. from loguru import logger
  7. from common import YmlConstant, Constant
  8. # 生产者
  9. class CustomerKafkaProducer:
  10. def __init__(self, context):
  11. self.__context = context
  12. self.__configs = YmlConstant.get_kafka_producer_config(context)
  13. self.customerProducer = None
  14. self.get_producer()
  15. # 获取kafka生产者
  16. def get_producer(self):
  17. if self.customerProducer is None:
  18. logger.info("配置kafka生产者!")
  19. self.customerProducer = KafkaProducer(
  20. bootstrap_servers=YmlConstant.get_kafka_bootstrap_servers(self.__context),
  21. acks=self.__configs[YmlConstant.KAFKA_ACKS],
  22. retries=self.__configs[YmlConstant.KAFKA_RETRIES],
  23. linger_ms=self.__configs[YmlConstant.KAFKA_LINGER_MS],
  24. retry_backoff_ms=self.__configs[YmlConstant.KAFKA_RETRY_BACKOFF_MS],
  25. max_in_flight_requests_per_connection=self.__configs[YmlConstant.KAFKA_MAX_IN_FLIGHT_REQUESTS],
  26. key_serializer=lambda m: json.dumps(m).encode(Constant.UTF_8),
  27. value_serializer=lambda m: json.dumps(m).encode(Constant.UTF_8))
  28. # mode 模式1:异步发送 2:同步发送
  29. # def on_send_success(record_metadata): 成功回调
  30. # def on_send_error(excp): 失败回调
  31. def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
  32. retry_send_num = 0
  33. while True:
  34. try:
  35. self.get_producer()
  36. logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
  37. message.get("request_id"))
  38. if mode == 1:
  39. if not customer_send_success:
  40. customer_send_success = CustomerKafkaProducer.on_send_success
  41. if not customer_send_error:
  42. customer_send_error = CustomerKafkaProducer.on_send_error
  43. self.customerProducer.send(topic=topic, key=key, value=message) \
  44. .add_callback(customer_send_success, message.get(YmlConstant.REQUEST_ID)) \
  45. .add_errback(customer_send_error, message.get(YmlConstant.REQUEST_ID))
  46. if mode == 2:
  47. try:
  48. self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
  49. logger.info("kafka同步发送信息成功, requestId:{}", message.get(YmlConstant.REQUEST_ID))
  50. except kafka_errors as ke:
  51. logger.exception("kafka同步发送消息异常: {}, requestId:{}", ke, message.get(YmlConstant.REQUEST_ID))
  52. raise ke
  53. break
  54. except Exception as e:
  55. retry_send_num += 1
  56. logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
  57. message.get(YmlConstant.REQUEST_ID))
  58. self.customerProducer = None
  59. if retry_send_num > 3:
  60. logger.exception("kafka发送消息重试失败: {}, requestId:{}", e, message.get(YmlConstant.REQUEST_ID))
  61. raise e
  62. def close_producer(self):
  63. self.customerProducer.flush()
  64. self.customerProducer.close()
  65. logger.info("kafka生产者关闭完成!")
  66. @staticmethod
  67. def on_send_success(requestId, record_metadata):
  68. logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
  69. record_metadata.partition, record_metadata.offset, requestId)
  70. @staticmethod
  71. def on_send_error(requestId, excp):
  72. logger.exception("kafka异步发送消息异常: {}, requestId:{}", excp, requestId)
  73. # 生产者
  74. class CustomerKafkaConsumer:
  75. def __init__(self, context, topics=()):
  76. logger.info("初始化消费者")
  77. self.__context = context
  78. self.__configs = YmlConstant.get_kafka_consumer_config(context)
  79. self.customerConsumer = None
  80. self.__topics = topics
  81. self.subscribe()
  82. logger.info("初始化消费者完成")
  83. def subscribe(self):
  84. if self.customerConsumer is None:
  85. logger.info("获取消费者!")
  86. self.customerConsumer = KafkaConsumer(
  87. bootstrap_servers=YmlConstant.get_kafka_bootstrap_servers(self.__context),
  88. client_id=self.__configs[YmlConstant.KAFKA_CLIENT_ID],
  89. group_id=self.__configs[YmlConstant.KAFKA_GROUP_ID],
  90. auto_offset_reset=self.__configs[YmlConstant.KAFKA_AUTO_OFFSET_RESET],
  91. enable_auto_commit=self.__configs[YmlConstant.KAFKA_ENABLE_AUTO_COMMIT],
  92. max_poll_records=self.__configs[YmlConstant.KAFKA_MAX_POLL_RECORDS],
  93. value_deserializer=lambda m: json.loads(m.decode(Constant.UTF_8)))
  94. logger.info("kafka生产者订阅topic:{}", self.__topics)
  95. # if self.topics is None or len(self.topics) == 0:
  96. # logger.error("消费者订阅topic不能为空!")
  97. # raise Exception("消费者订阅topic不能为空!")
  98. # # 手动配置分区
  99. # customer_partition = []
  100. # for topic in self.topics:
  101. # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
  102. # customer_partition.append(TopicPartition(topic, p))
  103. # self.customerConsumer.assign(customer_partition)
  104. # 自动配置
  105. self.customerConsumer.subscribe(topics=self.__topics)
  106. logger.info("kafka生产者订阅topic完成")
  107. def poll(self):
  108. msg = None
  109. try:
  110. self.subscribe()
  111. msg = self.customerConsumer.poll()
  112. except Exception as e:
  113. self.customerConsumer = None
  114. logger.exception("消费者拉取消息异常: {}", e)
  115. return msg
  116. def commit_offset(self, message):
  117. retry_num = 1
  118. while True:
  119. try:
  120. self.subscribe()
  121. logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  122. message.partition)
  123. tp = TopicPartition(topic=message.topic, partition=message.partition)
  124. self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(message.offset + 1, None))})
  125. logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  126. message.partition)
  127. break
  128. except Exception as e:
  129. self.customerConsumer = None
  130. logger.exception("消费者提交offset异常: {}, 重试次数: {}", e, retry_num)
  131. time.sleep(1)
  132. retry_num += 1
  133. if retry_num > 3:
  134. logger.exception("消费者提交offset重试失败: {}", e)
  135. break
  136. # if __name__=="__main__":
  137. # try:
  138. # 1/0
  139. # except Exception as e:
  140. # logger.exception("aaaaa:{} {}", e, "11111")