tuoheng_algN/util/KafkaUtils.py

168 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import time
from json import dumps, loads
from traceback import format_exc
from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
from loguru import logger
# 生产者
class CustomerKafkaProducer:
__slots__ = (
'__configs',
'customerProducer',
'__bootstrap_servers'
)
def __init__(self, kafka_config):
self.__configs = kafka_config["producer"]
self.__bootstrap_servers = kafka_config["bootstrap_servers"]
self.customerProducer = None
self.get_producer()
# 获取kafka生产者
def get_producer(self):
if self.customerProducer is None:
logger.info("配置kafka生产者!")
self.customerProducer = KafkaProducer(
bootstrap_servers=self.__bootstrap_servers,
acks=self.__configs["acks"],
retries=self.__configs["retries"],
linger_ms=self.__configs["linger_ms"],
retry_backoff_ms=self.__configs["retry_backoff_ms"],
max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"],
key_serializer=lambda m: dumps(m).encode("utf-8"),
value_serializer=lambda m: dumps(m).encode("utf-8"))
# mode 模式1异步发送 2同步发送
# def on_send_success(record_metadata): 成功回调
# def on_send_error(exc): 失败回调
def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
retry_send_num = 0
while True:
try:
self.get_producer()
logger.info("kafka发送信息topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
message.get("request_id"))
if mode == 1:
if not customer_send_success:
customer_send_success = CustomerKafkaProducer.on_send_success
if not customer_send_error:
customer_send_error = CustomerKafkaProducer.on_send_error
self.customerProducer.send(topic=topic, key=key, value=message) \
.add_callback(customer_send_success, message.get("request_id")) \
.add_errback(customer_send_error, message.get("request_id"))
if mode == 2:
try:
self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
logger.info("kafka同步发送信息成功, requestId:{}", message.get("request_id"))
except Exception as ke:
logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(),
message.get("request_id"))
raise ke
break
except Exception as e:
retry_send_num += 1
logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
message.get("request_id"))
time.sleep(1)
self.customerProducer = None
if retry_send_num > 3:
logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(),
message.get("request_id"))
raise e
def close_producer(self):
self.customerProducer.flush()
self.customerProducer.close()
logger.info("kafka生产者关闭完成!")
@staticmethod
def on_send_success(requestId, record_metadata):
logger.info("kafka异步发送信息成功topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
record_metadata.partition, record_metadata.offset, requestId)
@staticmethod
def on_send_error(requestId, exc):
logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId)
# 生产者
class CustomerKafkaConsumer:
__slots__ = ('__configs', 'customerConsumer', '__bootstrap_servers', '__topics')
def __init__(self, kafka_config, topics=()):
logger.info("初始化消费者")
self.__configs = kafka_config["consumer"]
self.__bootstrap_servers = kafka_config["bootstrap_servers"]
self.customerConsumer = None
self.__topics = topics
self.subscribe()
logger.info("初始化消费者完成")
def subscribe(self):
if self.customerConsumer is None:
logger.info("获取消费者!")
self.customerConsumer = KafkaConsumer(
bootstrap_servers=self.__bootstrap_servers,
# client_id=self.__configs[KAFKA_CLIENT_ID],
group_id=self.__configs["group_id"],
auto_offset_reset=self.__configs["auto_offset_reset"],
enable_auto_commit=bool(self.__configs["enable_auto_commit"]),
max_poll_records=self.__configs["max_poll_records"],
value_deserializer=lambda m: loads(m.decode("utf-8")))
logger.info("kafka生产者订阅topic:{}", self.__topics)
# if self.topics is None or len(self.topics) == 0:
# logger.error("消费者订阅topic不能为空")
# raise Exception("消费者订阅topic不能为空")
# # 手动配置分区
# customer_partition = []
# for topic in self.topics:
# for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
# customer_partition.append(TopicPartition(topic, p))
# self.customerConsumer.assign(customer_partition)
# 自动配置
self.customerConsumer.subscribe(topics=self.__topics)
logger.info("kafka生产者订阅topic完成")
def poll(self):
msg = None
try:
self.subscribe()
msg = self.customerConsumer.poll()
except Exception:
self.customerConsumer = None
logger.error("消费者拉取消息异常: {}", format_exc())
return msg
def commit_offset(self, message, request_id,log=True):
retry_num = 0
topic = message.topic
offset = message.offset + 1
partition = message.partition
while True:
try:
self.subscribe()
if log: logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
request_id)
tp = TopicPartition(topic=topic, partition=partition)
self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
if log: logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
request_id)
break
except Exception:
self.customerConsumer = None
if log: logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id)
time.sleep(1)
retry_num += 1
if retry_num > 3:
if log : logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id)
break
# if __name__=="__main__":
# try:
# 1/0
# except Exception as e:
# logger.exception("aaaaa:{} {}", e, "11111")