168 lines
7.5 KiB
Python
168 lines
7.5 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
import time
|
|||
|
|
from json import dumps, loads
|
|||
|
|
from traceback import format_exc
|
|||
|
|
|
|||
|
|
from kafka import KafkaProducer, KafkaConsumer, TopicPartition, OffsetAndMetadata
|
|||
|
|
from loguru import logger
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 生产者
|
|||
|
|
class CustomerKafkaProducer:
|
|||
|
|
__slots__ = (
|
|||
|
|
'__configs',
|
|||
|
|
'customerProducer',
|
|||
|
|
'__bootstrap_servers'
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def __init__(self, kafka_config):
|
|||
|
|
self.__configs = kafka_config["producer"]
|
|||
|
|
self.__bootstrap_servers = kafka_config["bootstrap_servers"]
|
|||
|
|
self.customerProducer = None
|
|||
|
|
self.get_producer()
|
|||
|
|
|
|||
|
|
# 获取kafka生产者
|
|||
|
|
def get_producer(self):
|
|||
|
|
if self.customerProducer is None:
|
|||
|
|
logger.info("配置kafka生产者!")
|
|||
|
|
self.customerProducer = KafkaProducer(
|
|||
|
|
bootstrap_servers=self.__bootstrap_servers,
|
|||
|
|
acks=self.__configs["acks"],
|
|||
|
|
retries=self.__configs["retries"],
|
|||
|
|
linger_ms=self.__configs["linger_ms"],
|
|||
|
|
retry_backoff_ms=self.__configs["retry_backoff_ms"],
|
|||
|
|
max_in_flight_requests_per_connection=self.__configs["max_in_flight_requests_per_connection"],
|
|||
|
|
key_serializer=lambda m: dumps(m).encode("utf-8"),
|
|||
|
|
value_serializer=lambda m: dumps(m).encode("utf-8"))
|
|||
|
|
|
|||
|
|
# mode 模式1:异步发送 2:同步发送
|
|||
|
|
# def on_send_success(record_metadata): 成功回调
|
|||
|
|
# def on_send_error(exc): 失败回调
|
|||
|
|
def sender(self, topic, key, message, mode=1, customer_send_success=None, customer_send_error=None):
|
|||
|
|
retry_send_num = 0
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
self.get_producer()
|
|||
|
|
logger.info("kafka发送信息,topic:{}|key:{}|message:{}|mode:{}|requestId:{}", topic, key, message, mode,
|
|||
|
|
message.get("request_id"))
|
|||
|
|
if mode == 1:
|
|||
|
|
if not customer_send_success:
|
|||
|
|
customer_send_success = CustomerKafkaProducer.on_send_success
|
|||
|
|
if not customer_send_error:
|
|||
|
|
customer_send_error = CustomerKafkaProducer.on_send_error
|
|||
|
|
self.customerProducer.send(topic=topic, key=key, value=message) \
|
|||
|
|
.add_callback(customer_send_success, message.get("request_id")) \
|
|||
|
|
.add_errback(customer_send_error, message.get("request_id"))
|
|||
|
|
if mode == 2:
|
|||
|
|
try:
|
|||
|
|
self.customerProducer.send(topic=topic, key=key, value=message).get(timeout=30)
|
|||
|
|
logger.info("kafka同步发送信息成功, requestId:{}", message.get("request_id"))
|
|||
|
|
except Exception as ke:
|
|||
|
|
logger.error("kafka同步发送消息异常: {}, requestId:{}", format_exc(),
|
|||
|
|
message.get("request_id"))
|
|||
|
|
raise ke
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
retry_send_num += 1
|
|||
|
|
logger.error("kafka发送消息异常, 开始重试, 当前重试次数:{} requestId:{}", retry_send_num,
|
|||
|
|
message.get("request_id"))
|
|||
|
|
time.sleep(1)
|
|||
|
|
self.customerProducer = None
|
|||
|
|
if retry_send_num > 3:
|
|||
|
|
logger.error("kafka发送消息重试失败: {}, requestId:{}", format_exc(),
|
|||
|
|
message.get("request_id"))
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def close_producer(self):
|
|||
|
|
self.customerProducer.flush()
|
|||
|
|
self.customerProducer.close()
|
|||
|
|
logger.info("kafka生产者关闭完成!")
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def on_send_success(requestId, record_metadata):
|
|||
|
|
logger.info("kafka异步发送信息成功,topic:{}|partition:{}|offset:{}|requestId:{}", record_metadata.topic,
|
|||
|
|
record_metadata.partition, record_metadata.offset, requestId)
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def on_send_error(requestId, exc):
|
|||
|
|
logger.exception("kafka异步发送消息异常: {}, requestId:{}", exc, requestId)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 生产者
|
|||
|
|
class CustomerKafkaConsumer:
|
|||
|
|
__slots__ = ('__configs', 'customerConsumer', '__bootstrap_servers', '__topics')
|
|||
|
|
|
|||
|
|
def __init__(self, kafka_config, topics=()):
|
|||
|
|
logger.info("初始化消费者")
|
|||
|
|
self.__configs = kafka_config["consumer"]
|
|||
|
|
self.__bootstrap_servers = kafka_config["bootstrap_servers"]
|
|||
|
|
self.customerConsumer = None
|
|||
|
|
self.__topics = topics
|
|||
|
|
self.subscribe()
|
|||
|
|
logger.info("初始化消费者完成")
|
|||
|
|
|
|||
|
|
def subscribe(self):
|
|||
|
|
if self.customerConsumer is None:
|
|||
|
|
logger.info("获取消费者!")
|
|||
|
|
self.customerConsumer = KafkaConsumer(
|
|||
|
|
bootstrap_servers=self.__bootstrap_servers,
|
|||
|
|
# client_id=self.__configs[KAFKA_CLIENT_ID],
|
|||
|
|
group_id=self.__configs["group_id"],
|
|||
|
|
auto_offset_reset=self.__configs["auto_offset_reset"],
|
|||
|
|
enable_auto_commit=bool(self.__configs["enable_auto_commit"]),
|
|||
|
|
max_poll_records=self.__configs["max_poll_records"],
|
|||
|
|
value_deserializer=lambda m: loads(m.decode("utf-8")))
|
|||
|
|
logger.info("kafka生产者订阅topic:{}", self.__topics)
|
|||
|
|
# if self.topics is None or len(self.topics) == 0:
|
|||
|
|
# logger.error("消费者订阅topic不能为空!")
|
|||
|
|
# raise Exception("消费者订阅topic不能为空!")
|
|||
|
|
# # 手动配置分区
|
|||
|
|
# customer_partition = []
|
|||
|
|
# for topic in self.topics:
|
|||
|
|
# for p in self.content["kafka"][self.content["dsp"]["active"]][topic]["partition"]:
|
|||
|
|
# customer_partition.append(TopicPartition(topic, p))
|
|||
|
|
# self.customerConsumer.assign(customer_partition)
|
|||
|
|
# 自动配置
|
|||
|
|
self.customerConsumer.subscribe(topics=self.__topics)
|
|||
|
|
logger.info("kafka生产者订阅topic完成")
|
|||
|
|
|
|||
|
|
def poll(self):
|
|||
|
|
msg = None
|
|||
|
|
try:
|
|||
|
|
self.subscribe()
|
|||
|
|
msg = self.customerConsumer.poll()
|
|||
|
|
except Exception:
|
|||
|
|
self.customerConsumer = None
|
|||
|
|
logger.error("消费者拉取消息异常: {}", format_exc())
|
|||
|
|
return msg
|
|||
|
|
|
|||
|
|
def commit_offset(self, message, request_id,log=True):
|
|||
|
|
retry_num = 0
|
|||
|
|
topic = message.topic
|
|||
|
|
offset = message.offset + 1
|
|||
|
|
partition = message.partition
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
self.subscribe()
|
|||
|
|
if log: logger.info("消费者开始提交offset,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
|
|||
|
|
request_id)
|
|||
|
|
tp = TopicPartition(topic=topic, partition=partition)
|
|||
|
|
self.customerConsumer.commit(offsets={tp: (OffsetAndMetadata(offset, None))})
|
|||
|
|
if log: logger.info("消费者提交offset完成,topic:{}|offset:{}|partition:{}|requestId:{}", topic, offset, partition,
|
|||
|
|
request_id)
|
|||
|
|
break
|
|||
|
|
except Exception:
|
|||
|
|
self.customerConsumer = None
|
|||
|
|
if log: logger.error("消费者提交offset异常: {}, 重试次数: {}, requestId:{}", format_exc(), retry_num, request_id)
|
|||
|
|
time.sleep(1)
|
|||
|
|
retry_num += 1
|
|||
|
|
if retry_num > 3:
|
|||
|
|
if log : logger.error("消费者提交offset重试失败: {}, requestId:{}", format_exc(), request_id)
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# if __name__=="__main__":
|
|||
|
|
# try:
|
|||
|
|
# 1/0
|
|||
|
|
# except Exception as e:
|
|||
|
|
# logger.exception("aaaaa:{} {}", e, "11111")
|