You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
5.8KB

  1. from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
  2. from kafka.errors import kafka_errors
  3. import json
  4. from loguru import logger
  5. # 生产者
  6. class CustomerKafkaProducer():
  7. def __init__(self, content, requestId):
  8. self.content = content
  9. self.configs = self.content["kafka"][self.content["dsp"]["active"]]["producer"]
  10. self.customerProducer = None
  11. self.requestId = requestId
  12. self.get_producer()
  13. # 获取kafka生产者
  14. def get_producer(self):
  15. if self.customerProducer is None:
  16. logger.info("配置kafka生产者, requestId:{}", self.requestId)
  17. self.customerProducer = KafkaProducer(bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"],
  18. acks=self.configs["acks"],
  19. retries=self.configs["retries"],
  20. linger_ms=self.configs["linger_ms"],
  21. retry_backoff_ms=self.configs["retry_backoff_ms"],
  22. max_in_flight_requests_per_connection=self.configs[
  23. "max_in_flight_requests_per_connection"],
  24. key_serializer=lambda m: json.dumps(m).encode('utf-8'),
  25. value_serializer=lambda m: json.dumps(m).encode('utf-8'))
  26. # mode 模式1:异步发送 2:同步发送
  27. # def on_send_success(record_metadata): 成功回调
  28. # def on_send_error(excp): 失败回调
  29. def sender(self, topic, key, message, mode, customer_send_success=None, customer_send_error=None):
  30. self.get_producer()
  31. logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode, self.requestId)
  32. if mode == 1:
  33. if not customer_send_success:
  34. customer_send_success = CustomerKafkaProducer.on_send_success
  35. if not customer_send_error:
  36. customer_send_error = CustomerKafkaProducer.on_send_error
  37. self.customerProducer.send(topic=topic, key=key, value=message).add_callback(
  38. customer_send_success, self.requestId).add_errback(customer_send_error, self.requestId)
  39. if mode == 2:
  40. try:
  41. self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
  42. logger.info("kafka同步发送信息成功, requestId:{}", self.requestId)
  43. except kafka_errors as e:
  44. logger.exception("kafka同步发送消息异常: {}, requestId:{}", e, self.requestId)
  45. raise e
  46. def close_producer(self):
  47. self.customerProducer.flush()
  48. self.customerProducer.close()
  49. logger.info("kafka生产者关闭完成, requestId:{}", self.requestId)
  50. def on_send_success(requestId, record_metadata):
  51. logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
  52. record_metadata.partition, record_metadata.offset, requestId)
  53. def on_send_error(requestId, excp):
  54. logger.exception("kafka异步发送消息异常: {}, requestId:{}", excp, requestId)
  55. # 生产者
  56. class CustomerKafkaConsumer():
  57. def __init__(self, content):
  58. logger.info("初始化消费者")
  59. self.content = content
  60. self.configs = self.content["kafka"][self.content["dsp"]["active"]]["consumer"]
  61. self.customerConsumer = None
  62. logger.info("初始化消费者完成")
  63. def get_consumer(self):
  64. if self.customerConsumer is None:
  65. logger.info("获取消费者!")
  66. self.customerConsumer = KafkaConsumer(bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"],
  67. client_id=self.configs["client_id"],
  68. group_id=self.configs["group_id"],
  69. auto_offset_reset=self.configs["auto_offset_reset"],
  70. enable_auto_commit=self.configs["enable_auto_commit"],
  71. max_poll_records=self.configs["max_poll_records"],
  72. value_deserializer=lambda m: json.loads(m.decode('utf-8')))
  73. def subscribe(self, topics=()):
  74. logger.info("kafka生产者订阅topic:{}", topics)
  75. if topics is None or len(topics) == 0:
  76. logger.error("消费者订阅topic不能为空!")
  77. raise Exception("消费者订阅topic不能为空!")
  78. # 手动配置分区
  79. # customer_partition = []
  80. # for topic in topics:
  81. # for p in self.content["kafka"]["topic"][topic]["partition"]:
  82. # customer_partition.append(TopicPartition(topic, p))
  83. # self.customerConsumer.assign(customer_partition)
  84. # 自动配置
  85. self.customerConsumer.subscribe(topics=topics)
  86. logger.info("kafka生产者订阅topic完成")
  87. def commit_offset(self, message):
  88. logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  89. message.partition)
  90. tp = TopicPartition(topic=message.topic, partition=message.partition)
  91. self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(message.offset + 1, None))})
  92. logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1,
  93. message.partition)
  94. # if __name__=="__main__":
  95. # try:
  96. # 1/0
  97. # except Exception as e:
  98. # logger.exception("aaaaa:{} {}", e, "11111")