import time from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata from kafka.errors import kafka_errors import json from loguru import logger # 生产者 class CustomerKafkaProducer(): def __init__(self, content): self.content = content self.configs = self.content["kafka"][self.content["dsp"]["active"]]["producer"] self.customerProducer = None self.get_producer() # 获取kafka生产者 def get_producer(self): if self.customerProducer is None: logger.info("配置kafka生产者!") self.customerProducer = KafkaProducer( bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"], acks=self.configs["acks"], retries=self.configs["retries"], linger_ms=self.configs["linger_ms"], retry_backoff_ms=self.configs["retry_backoff_ms"], max_in_flight_requests_per_connection=self.configs[ "max_in_flight_requests_per_connection"], key_serializer=lambda m: json.dumps(m).encode('utf-8'), value_serializer=lambda m: json.dumps(m).encode('utf-8')) # mode 模式1:异步发送 2:同步发送 # def on_send_success(record_metadata): 成功回调 # def on_send_error(excp): 失败回调 def sender(self, topic, key, message, mode, customer_send_success=None, customer_send_error=None): retry_send_num = 0 while True: try: self.get_producer() logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode, message.get("request_id")) if mode == 1: if not customer_send_success: customer_send_success = CustomerKafkaProducer.on_send_success if not customer_send_error: customer_send_error = CustomerKafkaProducer.on_send_error self.customerProducer.send(topic=topic, key=key, value=message) \ .add_callback(customer_send_success, message.get("request_id")) \ .add_errback(customer_send_error, message.get("request_id")) if mode == 2: try: self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30) logger.info("kafka同步发送信息成功, requestId:{}", message.get("request_id")) except kafka_errors as e: logger.exception("kafka同步发送消息异常: {}, requestId:{}", e, message.get("request_id")) raise e break except Exception as e: logger.exception("kafka发送消息异常: {}, requestId:{}", e, message.get("request_id")) self.customerProducer = None retry_send_num += 1 if retry_send_num > 3: logger.exception("kafka发送消息重试失败: {}, requestId:{}", e, message.get("request_id")) raise e def close_producer(self): self.customerProducer.flush() self.customerProducer.close() logger.info("kafka生产者关闭完成!") def on_send_success(requestId, record_metadata): logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic, record_metadata.partition, record_metadata.offset, requestId) def on_send_error(requestId, excp): logger.exception("kafka异步发送消息异常: {}, requestId:{}", excp, requestId) # 生产者 class CustomerKafkaConsumer(): def __init__(self, content, topics=()): logger.info("初始化消费者") self.content = content self.configs = self.content["kafka"][self.content["dsp"]["active"]]["consumer"] self.customerConsumer = None self.topics = topics self.subscribe() logger.info("初始化消费者完成") def subscribe(self): if self.customerConsumer is None: logger.info("获取消费者!") self.customerConsumer = KafkaConsumer( bootstrap_servers=self.content["kafka"][self.content["dsp"]["active"]]["bootstrap_servers"], client_id=self.configs["client_id"], group_id=self.configs["group_id"], auto_offset_reset=self.configs["auto_offset_reset"], enable_auto_commit=self.configs["enable_auto_commit"], max_poll_records=self.configs["max_poll_records"], value_deserializer=lambda m: json.loads(m.decode('utf-8'))) logger.info("kafka生产者订阅topic:{}", self.topics) # if self.topics is None or len(self.topics) == 0: # logger.error("消费者订阅topic不能为空!") # raise Exception("消费者订阅topic不能为空!") # # 手动配置分区 # customer_partition = [] # for topic in self.topics: # for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]: # customer_partition.append(TopicPartition(topic, p)) # self.customerConsumer.assign(customer_partition) # 自动配置 self.customerConsumer.subscribe(topics=self.topics) logger.info("kafka生产者订阅topic完成") def poll(self): msg = None try: self.subscribe() msg = self.customerConsumer.poll() except Exception as e: self.customerConsumer = None logger.exception("消费者拉取消息异常: {}", e) return msg def commit_offset(self, message): retry_num = 1 while True: try: self.subscribe() logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1, message.partition) tp = TopicPartition(topic=message.topic, partition=message.partition) self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(message.offset + 1, None))}) logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}", message.topic, message.offset + 1, message.partition) break except Exception as e: self.customerConsumer = None logger.exception("消费者提交offset异常: {}, 重试次数: {}", e, retry_num) time.sleep(1) retry_num += 1 if retry_num > 3: logger.exception("消费者提交offset重试失败: {}", e) break # if __name__=="__main__": # try: # 1/0 # except Exception as e: # logger.exception("aaaaa:{} {}", e, "11111")