# -*- coding: utf-8 -*- import time from threading import Thread from traceback import format_exc from loguru import logger from util.KafkaUtils import CustomerKafkaProducer ''' 问题反馈线程 ''' class FeedbackThread(Thread): __slots__ = [ '__fbQueue', '__context' ] def __init__(self, fbQueue, context): super().__init__() self.__fbQueue = fbQueue self.__context = context ''' 阻塞获取反馈消息 ''' def getFeedback(self): return self.__fbQueue.get() def run(self): logger.info("启动问题反馈线程") kafkaProducer = CustomerKafkaProducer(self.__context) dsp_push_stream_result_topic = self.__context["kafka"]["topic"]["dsp-push-stream-result-topic"] while True: logger.info("问题反馈发送消息循环") feedback = {} try: fb = self.getFeedback() if fb is not None and len(fb) > 0: pull_stream = fb.get("pull_stream") if pull_stream is not None and len(pull_stream) > 0: kafkaProducer.sender(dsp_push_stream_result_topic, pull_stream["requestId"], pull_stream, 1) else: time.sleep(1) except Exception: logger.error("问题反馈异常:{}, requestId:{}", format_exc(), feedback.get("request_id")) logger.info("问题反馈线程执行完成")