# -*- coding: utf-8 -*- import time from json import dumps, loads from traceback import format_exc from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata from loguru import logger # 生产者 class CustomerKafkaProducer: __slots__ = ( '__context', '__configs', 'customerProducer', '__bootstrap_servers' ) def __init__(self, context): self.__context = context self.__configs = context["kafka"][context["dsp"]["active"]]["producer"] self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"] self.customerProducer = None self.get_producer() # 获取kafka生产者 def get_producer(self): if self.customerProducer is None: logger.info("配置kafka生产者!") self.customerProducer = KafkaProducer( bootstrap_servers=self.__bootstrap_servers, acks=self.__configs["acks"], retries=self.__configs["retries"], linger_ms=self.__configs["linger_ms"], retry_backoff_ms=self.__configs["retry_backoff_ms"], max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"], key_serializer=lambda m: dumps(m).encode("utf-8"), value_serializer=lambda m: dumps(m).encode("utf-8")) # mode 模式1:异步发送 2:同步发送 # def on_send_success(record_metadata): 成功回调 # def on_send_error(exc): 失败回调 def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None): retry_send_num = 0 while True: try: self.get_producer() logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode, message.get("request_id")) if mode == 1: if not customer_send_success: customer_send_success = CustomerKafkaProducer.on_send_success if not customer_send_error: customer_send_error = CustomerKafkaProducer.on_send_error self.customerProducer.send(topic=topic, key=key, value=message) \ .add_callback(customer_send_success, message.get("request_id")) \ .add_errback(customer_send_error, message.get("request_id")) if mode == 2: try: self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30) logger.info("kafka同步发送信息成功, requestId:{}", message.get("request_id")) except Exception as ke: logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(), message.get("request_id")) raise ke break except Exception as e: retry_send_num += 1 logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num, message.get("request_id")) time.sleep(1) self.customerProducer = None if retry_send_num > 3: logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(), message.get("request_id")) raise e def close_producer(self): self.customerProducer.flush() self.customerProducer.close() logger.info("kafka生产者关闭完成!") @staticmethod def on_send_success(requestId, record_metadata): logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic, record_metadata.partition, record_metadata.offset, requestId) @staticmethod def on_send_error(requestId, exc): logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId) # 生产者 class CustomerKafkaConsumer: __slots__ = ( '__context', '__configs', 'customerConsumer', '__bootstrap_servers', '__topics' ) def __init__(self, context, topics=()): logger.info("初始化消费者") self.__context = context self.__configs = context["kafka"][context["dsp"]["active"]]["consumer"] self.__bootstrap_servers = context["kafka"][context["dsp"]["active"]]["bootstrap_servers"] self.customerConsumer = None self.__topics = topics self.subscribe() logger.info("初始化消费者完成") def subscribe(self): if self.customerConsumer is None: logger.info("获取消费者!") self.customerConsumer = KafkaConsumer( bootstrap_servers=self.__bootstrap_servers, # client_id=self.__configs[KAFKA_CLIENT_ID], group_id=self.__configs["group_id"], auto_offset_reset=self.__configs["auto_offset_reset"], enable_auto_commit=bool(self.__configs["enable_auto_commit"]), max_poll_records=self.__configs["max_poll_records"], value_deserializer=lambda m: loads(m.decode("utf-8"))) logger.info("kafka生产者订阅topic:{}", self.__topics) # if self.topics is None or len(self.topics) == 0: # logger.error("消费者订阅topic不能为空!") # raise Exception("消费者订阅topic不能为空!") # # 手动配置分区 # customer_partition = [] # for topic in self.topics: # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]: # customer_partition.append(TopicPartition(topic, p)) # self.customerConsumer.assign(customer_partition) # 自动配置 self.customerConsumer.subscribe(topics=self.__topics) logger.info("kafka生产者订阅topic完成") def poll(self): msg = None try: self.subscribe() msg = self.customerConsumer.poll() except Exception: self.customerConsumer = None logger.error("消费者拉取消息异常: {}", format_exc()) return msg def commit_offset(self, message): retry_num = 0 request_id = message.value.get('request_id') if not request_id: request_id = "1" topic = message.topic offset = message.offset + 1 partition = message.partition while True: try: self.subscribe() logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) tp = TopicPartition(topic=topic, partition=partition) self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))}) logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition, request_id) break except Exception: self.customerConsumer = None logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id) time.sleep(1) retry_num += 1 if retry_num > 3: logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id) break # if __name__=="__main__": # try: # 1/0 # except Exception as e: # logger.exception("aaaaa:{} {}", e, "11111")