|
- # -*- coding: utf-8 -*-
- import time
- from json import dumps, loads
- from traceback import format_exc
-
- from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
- from loguru import logger
-
-
- # 生产者
- class CustomerKafkaProducer:
- __slots__ = (
- '__context',
- '__configs',
- 'customerProducer',
- '__bootstrap_servers'
- )
-
- def __init__(self, context):
- self.__context = context
- self.__configs = context["kafka"][context["dsp"]["active"]]["producer"]
- self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"]
- self.customerProducer = None
- self.get_producer()
-
- # 获取kafka生产者
- def get_producer(self):
- if self.customerProducer is None:
- logger.info("配置kafka生产者!")
- self.customerProducer = KafkaProducer(
- bootstrap_servers=self.__bootstrap_servers,
- acks=self.__configs["acks"],
- retries=self.__configs["retries"],
- linger_ms=self.__configs["linger_ms"],
- retry_backoff_ms=self.__configs["retry_backoff_ms"],
- max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"],
- key_serializer=lambda m: dumps(m).encode("utf-8"),
- value_serializer=lambda m: dumps(m).encode("utf-8"))
-
- # mode 模式1:异步发送 2:同步发送
- # def on_send_success(record_metadata): 成功回调
- # def on_send_error(exc): 失败回调
- def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
- retry_send_num = 0
- while True:
- try:
- self.get_producer()
- logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
- message.get("requestId"))
- if mode == 1:
- if not customer_send_success:
- customer_send_success = CustomerKafkaProducer.on_send_success
- if not customer_send_error:
- customer_send_error = CustomerKafkaProducer.on_send_error
- self.customerProducer.send(topic=topic, key=key, value=message) \
- .add_callback(customer_send_success, message.get("requestId")) \
- .add_errback(customer_send_error, message.get("requestId"))
- if mode == 2:
- try:
- self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
- logger.info("kafka同步发送信息成功, requestId:{}", message.get("requestId"))
- except Exception as ke:
- logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(),
- message.get("requestId"))
- raise ke
- break
- except Exception as e:
- retry_send_num += 1
- logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
- message.get("requestId"))
- time.sleep(1)
- self.customerProducer = None
- if retry_send_num > 3:
- logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(),
- message.get("requestId"))
- raise e
-
- def close_producer(self):
- self.customerProducer.flush()
- self.customerProducer.close()
- logger.info("kafka生产者关闭完成!")
-
- @staticmethod
- def on_send_success(requestId, record_metadata):
- logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
- record_metadata.partition, record_metadata.offset, requestId)
-
- @staticmethod
- def on_send_error(requestId, exc):
- logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId)
-
-
- # 生产者
- class CustomerKafkaConsumer:
- __slots__ = (
- '__context',
- '__configs',
- 'customerConsumer',
- '__bootstrap_servers',
- '__topics'
- )
-
- def __init__(self, context, topics=()):
- logger.info("初始化消费者")
- self.__context = context
- self.__configs = context["kafka"][context["dsp"]["active"]]["consumer"]
- self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"]
- self.customerConsumer = None
- self.__topics = topics
- self.subscribe()
- logger.info("初始化消费者完成")
-
- def subscribe(self):
- if self.customerConsumer is None:
- logger.info("获取消费者!")
- self.customerConsumer = KafkaConsumer(
- bootstrap_servers=self.__bootstrap_servers,
- # client_id=self.__configs[KAFKA_CLIENT_ID],
- group_id=self.__configs["group_id"],
- auto_offset_reset=self.__configs["auto_offset_reset"],
- enable_auto_commit=bool(self.__configs["enable_auto_commit"]),
- max_poll_records=self.__configs["max_poll_records"],
- value_deserializer=lambda m: loads(m.decode("utf-8")))
- logger.info("kafka生产者订阅topic:{}", self.__topics)
- # if self.topics is None or len(self.topics) == 0:
- # logger.error("消费者订阅topic不能为空!")
- # raise Exception("消费者订阅topic不能为空!")
- # # 手动配置分区
- # customer_partition = []
- # for topic in self.topics:
- # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
- # customer_partition.append(TopicPartition(topic, p))
- # self.customerConsumer.assign(customer_partition)
- # 自动配置
- self.customerConsumer.subscribe(topics=self.__topics)
- logger.info("kafka生产者订阅topic完成")
-
- def poll(self):
- msg = None
- try:
- self.subscribe()
- msg = self.customerConsumer.poll()
- except Exception:
- self.customerConsumer = None
- logger.error("消费者拉取消息异常: {}", format_exc())
- return msg
-
- def commit_offset(self, message):
- retry_num = 0
- requestId = message.value.get('requestId')
- if not requestId:
- requestId = "1"
- topic = message.topic
- offset = message.offset + 1
- partition = message.partition
- while True:
- try:
- self.subscribe()
- logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
- requestId)
- tp = TopicPartition(topic=topic, partition=partition)
- self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
- logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
- requestId)
- break
- except Exception:
- self.customerConsumer = None
- logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, requestId)
- time.sleep(1)
- retry_num += 1
- if retry_num > 3:
- logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), requestId)
- break
-
- # if __name__=="__main__":
- # try:
- # 1/0
- # except Exception as e:
- # logger.exception("aaaaa:{} {}", e, "11111")
|