空港防疫算法交互
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
6.8KB

  1. import time
  2. from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
  3. from kafka.errors import kafka_errors
  4. import json
  5. from loguru import logger
  6. # 生产者
  7. class CustomerKafkaProducer():
  8. def __init__(self, content):
  9. self.content = content
  10. self.configs = self.content["kafka"][self.content["dsp"]["active"]]["producer"]
  11. self.customerProducer = None
  12. self.get_producer()
  13. # 获取kafka生产者
  14. def get_producer(self):
  15. if self.customerProducer is None:
  16. logger.info("配置kafka生产者!")
  17. self.customerProducer = KafkaProducer(
  18. bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"],
  19. acks=self.configs["acks"],
  20. retries=self.configs["retries"],
  21. linger_ms=self.configs["linger_ms"],
  22. retry_backoff_ms=self.configs["retry_backoff_ms"],
  23. max_in_flight_requests_per_connection=self.configs[
  24. "max_in_flight_requests_per_connection"],
  25. key_serializer=lambda m: json.dumps(m).encode('utf-8'),
  26. value_serializer=lambda m: json.dumps(m).encode('utf-8'))
  27. # mode 模式1:异步发送 2:同步发送
  28. # def on_send_success(record_metadata): 成功回调
  29. # def on_send_error(excp): 失败回调
  30. def sender(self, topic, key, message, mode, customer_send_success=None, customer_send_error=None):
  31. retry_send_num = 1
  32. while True:
  33. try:
  34. self.get_producer()
  35. logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|msgId:{}", topic, key, message, mode,
  36. message.get("msgId"))
  37. if mode == 1:
  38. if not customer_send_success:
  39. customer_send_success = CustomerKafkaProducer.on_send_success
  40. if not customer_send_error:
  41. customer_send_error = CustomerKafkaProducer.on_send_error
  42. self.customerProducer.send(topic=topic, key=key, value=message) \
  43. .add_callback(customer_send_success, message.get("msgId")) \
  44. .add_errback(customer_send_error, message.get("msgId"))
  45. if mode == 2:
  46. try:
  47. self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
  48. logger.info("kafka同步发送信息成功, msgId:{}", message.get("msgId"))
  49. except kafka_errors as e:
  50. logger.exception("kafka同步发送消息异常: {}, msgId:{}", e, message.get("msgId"))
  51. raise e
  52. break
  53. except Exception as e:
  54. logger.exception("kafka发送消息异常: {}, msgId:{}", e, message.get("msgId"))
  55. self.customerProducer = None
  56. retry_send_num += 1
  57. if retry_send_num > 2:
  58. logger.exception("kafka发送消息重试失败: {}, msgId:{}", e, message.get("msgId"))
  59. raise e
  60. def close_producer(self):
  61. self.customerProducer.flush()
  62. self.customerProducer.close()
  63. logger.info("kafka生产者关闭完成!")
  64. def on_send_success(msgId, record_metadata):
  65. logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|msgId:{}", record_metadata.topic,
  66. record_metadata.partition, record_metadata.offset, msgId)
  67. def on_send_error(msgId, excp):
  68. logger.exception("kafka异步发送消息异常: {}, msgId:{}", excp, msgId)
  69. # 生产者
  70. class CustomerKafkaConsumer():
  71. def __init__(self, content, topics=()):
  72. logger.info("初始化消费者")
  73. self.content = content
  74. self.configs = self.content["kafka"][self.content["dsp"]["active"]]["consumer"]
  75. self.customerConsumer = None
  76. self.topics = topics
  77. self.subscribe()
  78. logger.info("初始化消费者完成")
  79. def subscribe(self):
  80. if self.customerConsumer is None:
  81. logger.info("获取消费者!")
  82. self.customerConsumer = KafkaConsumer(
  83. bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"],
  84. client_id=self.configs["client_id"],
  85. group_id=self.configs["group_id"],
  86. auto_offset_reset=self.configs["auto_offset_reset"],
  87. enable_auto_commit=self.configs["enable_auto_commit"],
  88. max_poll_records=self.configs["max_poll_records"],
  89. value_deserializer=lambda m: json.loads(m.decode('utf-8')))
  90. logger.info("kafka生产者订阅topic:{}", self.topics)
  91. if self.topics is None or len(self.topics) == 0:
  92. logger.error("消费者订阅topic不能为空!")
  93. raise Exception("消费者订阅topic不能为空!")
  94. # 手动配置分区
  95. # customer_partition = []
  96. # for topic in self.topics:
  97. # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
  98. # customer_partition.append(TopicPartition(topic, p))
  99. # self.customerConsumer.assign(customer_partition)
  100. # 自动配置
  101. self.customerConsumer.subscribe(self.topics)
  102. logger.info("kafka生产者订阅topic完成")
  103. def poll(self):
  104. msg = None
  105. try:
  106. self.subscribe()
  107. msg = self.customerConsumer.poll()
  108. except Exception as e:
  109. self.customerConsumer = None
  110. logger.exception("消费者拉取消息异常: {}", e)
  111. return msg
  112. def commit_offset(self, message):
  113. retry_num = 1
  114. while True:
  115. try:
  116. self.subscribe()
  117. logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  118. message.partition)
  119. tp = TopicPartition(topic=message.topic, partition=message.partition)
  120. self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(message.offset + 1, None))})
  121. logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  122. message.partition)
  123. break
  124. except Exception as e:
  125. self.customerConsumer = None
  126. logger.exception("消费者提交offset异常: {}, 重试次数: {}", e, retry_num)
  127. time.sleep(1)
  128. retry_num += 1
  129. if retry_num > 3:
  130. logger.exception("消费者提交offset重试失败: {}", e)
  131. break
  132. # if __name__=="__main__":
  133. # try:
  134. # 1/0
  135. # except Exception as e:
  136. # logger.exception("aaaaa:{} {}", e, "11111")